This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ccff3f5711 [bugfix](light weight schema change) support delete
condition in schema change (#11869)
ccff3f5711 is described below
commit ccff3f571100835bf2b810f95729a29c8749ad4a
Author: yiguolei <[email protected]>
AuthorDate: Fri Aug 26 11:45:55 2022 +0800
[bugfix](light weight schema change) support delete condition in schema
change (#11869)
* [bugfix](light weight schema change) support delete condition in schema
change
Co-authored-by: yiguolei <[email protected]>
---
be/src/exec/olap_scanner.cpp | 6 +-
be/src/olap/delete_handler.cpp | 39 ++--
be/src/olap/delete_handler.h | 6 +-
be/src/olap/merger.cpp | 20 +-
be/src/olap/olap_cond.cpp | 2 +-
be/src/olap/olap_cond.h | 8 +-
be/src/olap/predicate_creator.h | 47 ++++
be/src/olap/reader.cpp | 88 ++-----
be/src/olap/reader.h | 6 +-
be/src/olap/rowset/beta_rowset_writer.cpp | 3 +-
be/src/olap/rowset/beta_rowset_writer.h | 3 +-
be/src/olap/rowset/rowset_meta.h | 5 +-
be/src/olap/rowset/rowset_writer.h | 3 +-
be/src/olap/schema_change.cpp | 108 ++++-----
be/src/olap/schema_change.h | 29 +--
be/src/olap/tablet.cpp | 15 +-
be/src/olap/tablet.h | 7 +-
be/src/olap/tablet_meta.cpp | 13 ++
be/src/olap/tablet_meta.h | 4 +-
be/src/olap/tablet_schema.cpp | 53 ++++-
be/src/olap/tablet_schema.h | 24 +-
be/src/olap/tuple_reader.cpp | 2 +-
be/src/vec/exec/volap_scanner.cpp | 6 +
be/src/vec/olap/block_reader.cpp | 2 +-
be/test/olap/delete_handler_test.cpp | 107 +++++++--
be/test/olap/rowid_conversion_test.cpp | 78 ++++++-
be/test/olap/rowset/segment_v2/segment_test.cpp | 60 ++---
.../org/apache/doris/alter/SchemaChangeJobV2.java | 2 +-
gensrc/thrift/PaloInternalService.thrift | 3 +
...able_column_with_delete_drop_column_dup_key.out | 133 +++++++++++
...e_column_with_delete_drop_column_unique_key.out | 79 +++++++
...e_column_with_delete_drop_column_dup_key.groovy | 258 +++++++++++++++++++++
...olumn_with_delete_drop_column_unique_key.groovy | 183 +++++++++++++++
.../test_alter_table_column_with_delete.groovy | 4 +-
34 files changed, 1124 insertions(+), 282 deletions(-)
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 33a70e16db..e052125a49 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -206,7 +206,11 @@ Status OlapScanner::_init_tablet_reader_params(
std::copy(_tablet->delete_predicates().cbegin(),
_tablet->delete_predicates().cend(),
std::inserter(_tablet_reader_params.delete_predicates,
_tablet_reader_params.delete_predicates.begin()));
-
+ // Merge the columns in delete predicate that not in latest schema in to
current tablet schema
+ for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) {
+ _tablet_schema->merge_dropped_columns(
+ _tablet->tablet_schema(Version(del_pred_pb.version(),
del_pred_pb.version())));
+ }
// Range
for (auto key_range : key_ranges) {
if (key_range->begin_scan_range.size() == 1 &&
diff --git a/be/src/olap/delete_handler.cpp b/be/src/olap/delete_handler.cpp
index 8bd87811f4..ae06bb97a3 100644
--- a/be/src/olap/delete_handler.cpp
+++ b/be/src/olap/delete_handler.cpp
@@ -30,7 +30,8 @@
#include "gen_cpp/olap_file.pb.h"
#include "olap/olap_common.h"
#include "olap/olap_cond.h"
-#include "olap/reader.h"
+#include "olap/predicate_creator.h"
+#include "olap/tablet.h"
#include "olap/utils.h"
using apache::thrift::ThriftDebugString;
@@ -237,52 +238,54 @@ bool DeleteHandler::_parse_condition(const std::string&
condition_str, TConditio
return true;
}
-Status DeleteHandler::init(TabletSchemaSPtr schema,
- const std::vector<DeletePredicatePB>&
delete_conditions, int64_t version,
- const TabletReader* reader) {
+Status DeleteHandler::init(std::shared_ptr<Tablet> tablet, TabletSchemaSPtr
tablet_schema,
+ const std::vector<DeletePredicatePB>&
delete_conditions,
+ int64_t version) {
DCHECK(!_is_inited) << "reinitialize delete handler.";
DCHECK(version >= 0) << "invalid parameters. version=" << version;
+ _predicate_mem_pool.reset(new MemPool());
for (const auto& delete_condition : delete_conditions) {
// Skip the delete condition with large version
if (delete_condition.version() > version) {
continue;
}
-
+ // Need the tablet schema at the delete condition to parse the
accurate column unique id
+ TabletSchemaSPtr delete_pred_related_schema = tablet->tablet_schema(
+ Version(delete_condition.version(),
delete_condition.version()));
DeleteConditions temp;
temp.filter_version = delete_condition.version();
- temp.del_cond = new (std::nothrow) Conditions();
+ temp.del_cond = new (std::nothrow) Conditions(tablet_schema);
if (temp.del_cond == nullptr) {
LOG(FATAL) << "fail to malloc Conditions. size=" <<
sizeof(Conditions);
return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
}
-
- temp.del_cond->set_tablet_schema(schema);
for (const auto& sub_predicate : delete_condition.sub_predicates()) {
TCondition condition;
if (!_parse_condition(sub_predicate, &condition)) {
LOG(WARNING) << "fail to parse condition. [condition=" <<
sub_predicate << "]";
return
Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_PARAMETERS);
}
-
+ condition.__set_column_unique_id(
+
delete_pred_related_schema->column(condition.column_name).unique_id());
Status res = temp.del_cond->append_condition(condition);
if (!res.ok()) {
LOG(WARNING) << "fail to append condition.res = " << res;
return res;
}
-
- if (reader != nullptr) {
- auto predicate = reader->_parse_to_predicate(condition, true);
- if (predicate != nullptr) {
- temp.column_predicate_vec.push_back(predicate);
- }
+ auto predicate =
+ parse_to_predicate(tablet_schema, condition,
_predicate_mem_pool.get(), true);
+ if (predicate != nullptr) {
+ temp.column_predicate_vec.push_back(predicate);
}
}
for (const auto& in_predicate : delete_condition.in_predicates()) {
TCondition condition;
condition.__set_column_name(in_predicate.column_name());
+ condition.__set_column_unique_id(
+
delete_pred_related_schema->column(condition.column_name).unique_id());
if (in_predicate.is_not_in()) {
condition.__set_condition_op("!*=");
} else {
@@ -296,10 +299,8 @@ Status DeleteHandler::init(TabletSchemaSPtr schema,
LOG(WARNING) << "fail to append condition.res = " << res;
return res;
}
-
- if (reader != nullptr) {
-
temp.column_predicate_vec.push_back(reader->_parse_to_predicate(condition,
true));
- }
+ temp.column_predicate_vec.push_back(
+ parse_to_predicate(tablet_schema, condition,
_predicate_mem_pool.get(), true));
}
_del_conds.emplace_back(std::move(temp));
diff --git a/be/src/olap/delete_handler.h b/be/src/olap/delete_handler.h
index 02e015a0e0..64eb8b2a7a 100644
--- a/be/src/olap/delete_handler.h
+++ b/be/src/olap/delete_handler.h
@@ -31,6 +31,7 @@ namespace doris {
class Conditions;
class RowCursor;
+class Tablet;
class TabletReader;
class TabletSchema;
@@ -90,8 +91,8 @@ public:
// return:
// * Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_PARAMETERS):
input parameters are not valid
// * Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR): alloc memory
failed
- Status init(TabletSchemaSPtr schema, const std::vector<DeletePredicatePB>&
delete_conditions,
- int64_t version, const doris::TabletReader* = nullptr);
+ Status init(std::shared_ptr<Tablet> tablet, TabletSchemaSPtr tablet_schema,
+ const std::vector<DeletePredicatePB>& delete_conditions,
int64_t version);
// Return the delete conditions' size.
size_t conditions_num() const { return _del_conds.size(); }
@@ -118,6 +119,7 @@ private:
bool _is_inited = false;
// DeleteConditions in _del_conds are in 'OR' relationship
std::vector<DeleteConditions> _del_conds;
+ std::unique_ptr<MemPool> _predicate_mem_pool;
DISALLOW_COPY_AND_ASSIGN(DeleteHandler);
};
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index fa7b99a973..4ee7d18796 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -47,8 +47,14 @@ Status Merger::merge_rowsets(TabletSharedPtr tablet,
ReaderType reader_type,
std::inserter(reader_params.delete_predicates,
reader_params.delete_predicates.begin()));
}
-
- reader_params.tablet_schema = cur_tablet_schema;
+ TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
+ merge_tablet_schema->copy_from(*cur_tablet_schema);
+ // Merge the columns in delete predicate that not in latest schema in to
current tablet schema
+ for (auto& del_pred_pb : reader_params.delete_predicates) {
+ merge_tablet_schema->merge_dropped_columns(
+ tablet->tablet_schema(Version(del_pred_pb.version(),
del_pred_pb.version())));
+ }
+ reader_params.tablet_schema = merge_tablet_schema;
RETURN_NOT_OK(reader.init(reader_params));
RowCursor row_cursor;
@@ -108,14 +114,20 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet,
ReaderType reader_type,
reader_params.reader_type = reader_type;
reader_params.rs_readers = src_rowset_readers;
reader_params.version = dst_rowset_writer->version();
- reader_params.tablet_schema = cur_tablet_schema;
{
std::shared_lock rdlock(tablet->get_header_lock());
std::copy(tablet->delete_predicates().cbegin(),
tablet->delete_predicates().cend(),
std::inserter(reader_params.delete_predicates,
reader_params.delete_predicates.begin()));
}
-
+ TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
+ merge_tablet_schema->copy_from(*cur_tablet_schema);
+ // Merge the columns in delete predicate that not in latest schema in to
current tablet schema
+ for (auto& del_pred_pb : reader_params.delete_predicates) {
+ merge_tablet_schema->merge_dropped_columns(
+ tablet->tablet_schema(Version(del_pred_pb.version(),
del_pred_pb.version())));
+ }
+ reader_params.tablet_schema = merge_tablet_schema;
if (tablet->enable_unique_key_merge_on_write()) {
reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
}
diff --git a/be/src/olap/olap_cond.cpp b/be/src/olap/olap_cond.cpp
index 02e818331b..1b8c286128 100644
--- a/be/src/olap/olap_cond.cpp
+++ b/be/src/olap/olap_cond.cpp
@@ -537,7 +537,7 @@ bool CondColumn::eval(const segment_v2::BloomFilter* bf)
const {
Status Conditions::append_condition(const TCondition& tcond) {
DCHECK(_schema != nullptr);
- int32_t index = _schema->field_index(tcond.column_name);
+ int32_t index = _schema->field_index(tcond.column_unique_id);
if (index < 0) {
LOG(WARNING) << "fail to get field index, field name=" <<
tcond.column_name;
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
diff --git a/be/src/olap/olap_cond.h b/be/src/olap/olap_cond.h
index 4c6afe1c9d..0be1912f20 100644
--- a/be/src/olap/olap_cond.h
+++ b/be/src/olap/olap_cond.h
@@ -144,9 +144,10 @@ class Conditions {
public:
// Key: field index of condition's column
// Value: CondColumn object
+ // col_unique_id --> CondColumn
typedef std::map<int32_t, CondColumn*> CondColumns;
- Conditions() {}
+ Conditions(TabletSchemaSPtr schema) : _schema(schema) {}
~Conditions() { finalize(); }
void finalize() {
@@ -157,9 +158,6 @@ public:
}
bool empty() const { return _columns.empty(); }
- // TODO(yingchun): should do it in constructor
- void set_tablet_schema(TabletSchemaSPtr schema) { _schema = schema; }
-
// 如果成功,则_columns中增加一项,如果失败则无视此condition,同时输出日志
// 对于下列情况,将不会被处理
// 1. column不属于key列
@@ -168,7 +166,7 @@ public:
const CondColumns& columns() const { return _columns; }
- CondColumn* get_column(int32_t cid) const;
+ CondColumn* get_column(int32_t col_unique_id) const;
private:
bool _cond_column_is_key_or_duplicate(const CondColumn* cc) const {
diff --git a/be/src/olap/predicate_creator.h b/be/src/olap/predicate_creator.h
index 89c26fbd53..810c47d13d 100644
--- a/be/src/olap/predicate_creator.h
+++ b/be/src/olap/predicate_creator.h
@@ -22,9 +22,11 @@
#include "olap/column_predicate.h"
#include "olap/comparison_predicate.h"
#include "olap/in_list_predicate.h"
+#include "olap/null_predicate.h"
#include "olap/olap_cond.h"
#include "olap/tablet_schema.h"
#include "util/date_func.h"
+#include "util/string_util.h"
namespace doris {
@@ -260,4 +262,49 @@ inline ColumnPredicate* create_list_predicate(const
TabletColumn& column, int in
pool);
}
+// This method is called in reader and in deletehandler.
+// When it is called by delete handler, then it should use the delete
predicate's tablet schema
+// to parse the conditions.
+inline ColumnPredicate* parse_to_predicate(TabletSchemaSPtr tablet_schema,
+ const TCondition& condition,
MemPool* mem_pool,
+ bool opposite = false) {
+ int32_t col_unique_id = condition.column_unique_id;
+ // TODO: not equal and not in predicate is not pushed down
+ const TabletColumn& column = tablet_schema->column_by_uid(col_unique_id);
+ uint32_t index = tablet_schema->field_index(col_unique_id);
+
+ if (to_lower(condition.condition_op) == "is") {
+ return new NullPredicate(index,
to_lower(condition.condition_values[0]) == "null",
+ opposite);
+ }
+
+ if ((condition.condition_op == "*=" || condition.condition_op == "!*=") &&
+ condition.condition_values.size() > 1) {
+ decltype(create_list_predicate<PredicateType::UNKNOWN>)* create =
nullptr;
+
+ if (condition.condition_op == "*=") {
+ create = create_list_predicate<PredicateType::IN_LIST>;
+ } else {
+ create = create_list_predicate<PredicateType::NOT_IN_LIST>;
+ }
+ return create(column, index, condition.condition_values, opposite,
mem_pool);
+ }
+
+ decltype(create_comparison_predicate<PredicateType::UNKNOWN>)* create =
nullptr;
+ if (condition.condition_op == "*=" || condition.condition_op == "=") {
+ create = create_comparison_predicate<PredicateType::EQ>;
+ } else if (condition.condition_op == "!*=" || condition.condition_op ==
"!=") {
+ create = create_comparison_predicate<PredicateType::NE>;
+ } else if (condition.condition_op == "<<") {
+ create = create_comparison_predicate<PredicateType::LT>;
+ } else if (condition.condition_op == "<=") {
+ create = create_comparison_predicate<PredicateType::LE>;
+ } else if (condition.condition_op == ">>") {
+ create = create_comparison_predicate<PredicateType::GT>;
+ } else if (condition.condition_op == ">=") {
+ create = create_comparison_predicate<PredicateType::GE>;
+ }
+ return create(column, index, condition.condition_values[0], opposite,
mem_pool);
+}
+
} //namespace doris
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 955d6699a6..9a79ca5b22 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -82,11 +82,6 @@ std::string TabletReader::KeysParam::to_string() const {
TabletReader::~TabletReader() {
VLOG_NOTICE << "merged rows:" << _merged_rows;
- _conditions.finalize();
- if (!_all_conditions.empty()) {
- _all_conditions.finalize();
- }
- _delete_handler.finalize();
for (auto pred : _col_predicates) {
delete pred;
@@ -97,11 +92,7 @@ TabletReader::~TabletReader() {
}
Status TabletReader::init(const ReaderParams& read_params) {
-#ifndef NDEBUG
- _predicate_mem_pool.reset(new MemPool());
-#else
_predicate_mem_pool.reset(new MemPool());
-#endif
Status res = _init_params(read_params);
if (!res.ok()) {
@@ -214,8 +205,8 @@ Status TabletReader::_capture_rs_readers(const
ReaderParams& read_params,
_orderby_key_columns.size() > 0 ? &_orderby_key_columns : nullptr;
_reader_context.load_bf_columns = &_load_bf_columns;
_reader_context.load_bf_all_columns = &_load_bf_all_columns;
- _reader_context.conditions = &_conditions;
- _reader_context.all_conditions = &_all_conditions;
+ _reader_context.conditions = _conditions.get();
+ _reader_context.all_conditions = _all_conditions.get();
_reader_context.predicates = &_col_predicates;
_reader_context.value_predicates = &_value_col_predicates;
_reader_context.lower_bound_keys = &_keys_param.start_keys;
@@ -447,20 +438,26 @@ Status TabletReader::_init_orderby_keys_param(const
ReaderParams& read_params) {
}
void TabletReader::_init_conditions_param(const ReaderParams& read_params) {
- _conditions.set_tablet_schema(_tablet_schema);
- _all_conditions.set_tablet_schema(_tablet_schema);
- for (const auto& condition : read_params.conditions) {
- ColumnPredicate* predicate = _parse_to_predicate(condition);
+ _conditions = std::make_unique<Conditions>(_tablet_schema);
+ _all_conditions = std::make_unique<Conditions>(_tablet_schema);
+ for (auto& condition : read_params.conditions) {
+ // These conditions is passed from OlapScannode, but not set column
unique id here, so that set it here because it
+ // is too complicated to modify related interface
+ TCondition tmp_cond = condition;
+ auto condition_col_uid =
_tablet_schema->column(tmp_cond.column_name).unique_id();
+ tmp_cond.__set_column_unique_id(condition_col_uid);
+ ColumnPredicate* predicate =
+ parse_to_predicate(_tablet_schema, tmp_cond,
_predicate_mem_pool.get());
if (predicate != nullptr) {
- if
(_tablet_schema->column(_tablet_schema->field_index(condition.column_name))
- .aggregation() !=
FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
+ if (_tablet_schema->column_by_uid(condition_col_uid).aggregation()
!=
+ FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
_value_col_predicates.push_back(predicate);
} else {
_col_predicates.push_back(predicate);
- Status status = _conditions.append_condition(condition);
+ Status status = _conditions->append_condition(tmp_cond);
DCHECK_EQ(Status::OK(), status);
}
- Status status = _all_conditions.append_condition(condition);
+ Status status = _all_conditions->append_condition(tmp_cond);
DCHECK_EQ(Status::OK(), status);
}
}
@@ -498,54 +495,9 @@ ColumnPredicate* TabletReader::_parse_to_predicate(const
FunctionFilter& functio
function_filter._string_param);
}
-ColumnPredicate* TabletReader::_parse_to_predicate(const TCondition& condition,
- bool opposite) const {
- // TODO: not equal and not in predicate is not pushed down
- int32_t index = _tablet_schema->field_index(condition.column_name);
- if (index < 0) {
- return nullptr;
- }
-
- const TabletColumn& column = _tablet_schema->column(index);
-
- if (to_lower(condition.condition_op) == "is") {
- return new NullPredicate(index,
to_lower(condition.condition_values[0]) == "null",
- opposite);
- }
-
- if ((condition.condition_op == "*=" || condition.condition_op == "!*=") &&
- condition.condition_values.size() > 1) {
- decltype(create_list_predicate<PredicateType::UNKNOWN>)* create =
nullptr;
-
- if (condition.condition_op == "*=") {
- create = create_list_predicate<PredicateType::IN_LIST>;
- } else {
- create = create_list_predicate<PredicateType::NOT_IN_LIST>;
- }
- return create(column, index, condition.condition_values, opposite,
- _predicate_mem_pool.get());
- }
-
- decltype(create_comparison_predicate<PredicateType::UNKNOWN>)* create =
nullptr;
- if (condition.condition_op == "*=" || condition.condition_op == "=") {
- create = create_comparison_predicate<PredicateType::EQ>;
- } else if (condition.condition_op == "!*=" || condition.condition_op ==
"!=") {
- create = create_comparison_predicate<PredicateType::NE>;
- } else if (condition.condition_op == "<<") {
- create = create_comparison_predicate<PredicateType::LT>;
- } else if (condition.condition_op == "<=") {
- create = create_comparison_predicate<PredicateType::LE>;
- } else if (condition.condition_op == ">>") {
- create = create_comparison_predicate<PredicateType::GT>;
- } else if (condition.condition_op == ">=") {
- create = create_comparison_predicate<PredicateType::GE>;
- }
- return create(column, index, condition.condition_values[0], opposite,
- _predicate_mem_pool.get());
-}
void TabletReader::_init_load_bf_columns(const ReaderParams& read_params) {
- _init_load_bf_columns(read_params, &_conditions, &_load_bf_columns);
- _init_load_bf_columns(read_params, &_all_conditions,
&_load_bf_all_columns);
+ _init_load_bf_columns(read_params, _conditions.get(), &_load_bf_columns);
+ _init_load_bf_columns(read_params, _all_conditions.get(),
&_load_bf_all_columns);
}
void TabletReader::_init_load_bf_columns(const ReaderParams& read_params,
Conditions* conditions,
@@ -615,8 +567,8 @@ Status TabletReader::_init_delete_condition(const
ReaderParams& read_params) {
_filter_delete = true;
}
- return _delete_handler.init(_tablet_schema, read_params.delete_predicates,
- read_params.version.second, this);
+ return _delete_handler.init(_tablet, _tablet_schema,
read_params.delete_predicates,
+ read_params.version.second);
}
} // namespace doris
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 8fd39e7fef..e113a6794c 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -165,8 +165,6 @@ protected:
void _init_conditions_param(const ReaderParams& read_params);
- ColumnPredicate* _parse_to_predicate(const TCondition& condition, bool
opposite = false) const;
-
ColumnPredicate* _parse_to_predicate(
const std::pair<std::string,
std::shared_ptr<IBloomFilterFuncBase>>& bloom_filter);
@@ -201,10 +199,10 @@ protected:
std::vector<bool> _is_lower_keys_included;
std::vector<bool> _is_upper_keys_included;
// contains condition on key columns in agg or unique table or all column
in dup tables
- Conditions _conditions;
+ std::unique_ptr<Conditions> _conditions;
// contains _conditions and condition on value columns, used for push down
// conditions to base rowset of unique table
- Conditions _all_conditions;
+ std::unique_ptr<Conditions> _all_conditions;
std::vector<ColumnPredicate*> _col_predicates;
std::vector<ColumnPredicate*> _value_col_predicates;
DeleteHandler _delete_handler;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index cf916f0eb4..745b5135c8 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -170,8 +170,7 @@ Status BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset)
{
return Status::OK();
}
-Status BetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr
rowset,
- const
SchemaMapping& schema_mapping) {
+Status BetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr
rowset) {
// TODO use schema_mapping to transfer zonemap
return add_rowset(rowset);
}
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index c3833f65d8..afa15b4018 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -45,8 +45,7 @@ public:
// add rowset by create hard link
Status add_rowset(RowsetSharedPtr rowset) override;
- Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset,
- const SchemaMapping&
schema_mapping) override;
+ Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset)
override;
Status flush() override;
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 5768b37ced..69b4f65911 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -233,8 +233,6 @@ public:
bool delete_flag() const { return _rowset_meta_pb.delete_flag(); }
- void set_delete_flag(bool delete_flag) {
_rowset_meta_pb.set_delete_flag(delete_flag); }
-
int64_t creation_time() const { return _rowset_meta_pb.creation_time(); }
void set_creation_time(int64_t creation_time) {
@@ -341,6 +339,7 @@ public:
int64_t oldest_write_timestamp() const { return
_rowset_meta_pb.oldest_write_timestamp(); }
int64_t newest_write_timestamp() const { return
_rowset_meta_pb.newest_write_timestamp(); }
+
void set_tablet_schema(const TabletSchemaSPtr& tablet_schema) {
DCHECK(_schema == nullptr);
_schema =
TabletSchemaCache::instance()->insert(tablet_schema->to_key());
@@ -395,7 +394,7 @@ private:
private:
RowsetMetaPB _rowset_meta_pb;
- std::shared_ptr<TabletSchema> _schema = nullptr;
+ TabletSchemaSPtr _schema = nullptr;
RowsetId _rowset_id;
io::FileSystemPtr _fs;
bool _is_removed_from_rowset_meta = false;
diff --git a/be/src/olap/rowset/rowset_writer.h
b/be/src/olap/rowset/rowset_writer.h
index 325e996fe6..2713b3c60c 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -50,8 +50,7 @@ public:
virtual Status add_rowset(RowsetSharedPtr rowset) = 0;
// Precondition: the input `rowset` should have the same type of the
rowset we're building
- virtual Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset,
- const SchemaMapping&
schema_mapping) = 0;
+ virtual Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset)
= 0;
// explicit flush all buffered rows into segment file.
// note that `add_row` could also trigger flush when certain conditions
are met
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 3f391fe58f..04683d75d1 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -39,7 +39,6 @@
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
-#include "vec/olap/block_reader.h"
using std::nothrow;
@@ -239,11 +238,6 @@ private:
RowRefComparator _cmp;
};
-RowBlockChanger::RowBlockChanger(TabletSchemaSPtr tablet_schema, DescriptorTbl
desc_tbl)
- : _desc_tbl(desc_tbl) {
- _schema_mapping.resize(tablet_schema->num_columns());
-}
-
RowBlockChanger::RowBlockChanger(TabletSchemaSPtr tablet_schema,
const DeleteHandler* delete_handler,
DescriptorTbl desc_tbl)
: _desc_tbl(desc_tbl) {
@@ -1138,25 +1132,24 @@ void RowBlockMerger::_pop_heap() {
}
Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader,
RowsetWriter* rowset_writer,
- TabletSharedPtr new_tablet, TabletSharedPtr
base_tablet) {
+ TabletSharedPtr new_tablet,
+ TabletSchemaSPtr base_tablet_schema) {
// In some cases, there may be more than one type of rowset in a tablet,
// in which case the conversion cannot be done directly by linked schema
change,
// but requires direct schema change to rewrite the data.
if (rowset_reader->type() != rowset_writer->type()) {
LOG(INFO) << "the type of rowset " <<
rowset_reader->rowset()->rowset_id()
- << " in base tablet " << base_tablet->tablet_id() << " is
not same as type "
- << rowset_writer->type() << ", use direct schema change.";
+ << " in base tablet is not same as type " <<
rowset_writer->type()
+ << ", use direct schema change.";
return SchemaChangeHandler::get_sc_procedure(_row_block_changer,
false, true)
- ->process(rowset_reader, rowset_writer, new_tablet,
base_tablet);
+ ->process(rowset_reader, rowset_writer, new_tablet,
base_tablet_schema);
} else {
- Status status = rowset_writer->add_rowset_for_linked_schema_change(
- rowset_reader->rowset(),
_row_block_changer.get_schema_mapping());
+ Status status =
rowset_writer->add_rowset_for_linked_schema_change(rowset_reader->rowset());
if (!status) {
LOG(WARNING) << "fail to convert rowset."
<< ", new_tablet=" << new_tablet->full_name()
- << ", base_tablet=" << base_tablet->full_name()
<< ", version=" << rowset_writer->version().first <<
"-"
- << rowset_writer->version().second;
+ << rowset_writer->version().second << ", error status
" << status;
}
return status;
}
@@ -1203,7 +1196,7 @@ Status reserve_block(std::unique_ptr<RowBlock,
RowBlockDeleter>* block_handle_pt
Status SchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr
rowset_reader,
RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet,
- TabletSharedPtr base_tablet) {
+ TabletSchemaSPtr
base_tablet_schema) {
if (_row_block_allocator == nullptr) {
_row_block_allocator = new
RowBlockAllocator(new_tablet->tablet_schema(), 0);
if (_row_block_allocator == nullptr) {
@@ -1273,11 +1266,10 @@ Status
SchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr
rowset_reader,
RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet,
- TabletSharedPtr base_tablet) {
+ TabletSchemaSPtr
base_tablet_schema) {
auto new_block =
std::make_unique<vectorized::Block>(new_tablet->tablet_schema()->create_block());
- auto ref_block =
-
std::make_unique<vectorized::Block>(base_tablet->tablet_schema()->create_block());
+ auto ref_block =
std::make_unique<vectorized::Block>(base_tablet_schema->create_block());
int origin_columns_size = ref_block->columns();
@@ -1322,7 +1314,7 @@ VSchemaChangeWithSorting::VSchemaChangeWithSorting(const
RowBlockChanger& row_bl
Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr
rowset_reader,
RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet,
- TabletSharedPtr base_tablet) {
+ TabletSchemaSPtr
base_tablet_schema) {
if (_row_block_allocator == nullptr) {
_row_block_allocator =
new (nothrow) RowBlockAllocator(new_tablet->tablet_schema(),
_memory_limitation);
@@ -1490,7 +1482,7 @@ Status
SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_read
Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr
rowset_reader,
RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet,
- TabletSharedPtr base_tablet) {
+ TabletSchemaSPtr
base_tablet_schema) {
// for internal sorting
std::vector<std::unique_ptr<vectorized::Block>> blocks;
@@ -1513,8 +1505,7 @@ Status
VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
auto new_block =
std::make_unique<vectorized::Block>(new_tablet->tablet_schema()->create_block());
- auto ref_block =
-
std::make_unique<vectorized::Block>(base_tablet->tablet_schema()->create_block());
+ auto ref_block =
std::make_unique<vectorized::Block>(base_tablet_schema->create_block());
int origin_columns_size = ref_block->columns();
@@ -1755,11 +1746,14 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
}
std::vector<Version> versions_to_be_changed;
- vectorized::BlockReader reader;
+ // reader_context is stack variables, it's lifetime should keep the same
+ // with rs_readers
+ RowsetReaderContext reader_context;
std::vector<RowsetReaderSharedPtr> rs_readers;
// delete handlers for new tablet
DeleteHandler delete_handler;
std::vector<ColumnId> return_columns;
+ // Create a new tablet schema, should merge with dropped columns in light
weight schema change
TabletSchemaSPtr base_tablet_schema = std::make_shared<TabletSchema>();
base_tablet_schema->copy_from(*base_tablet->tablet_schema());
if (!request.columns.empty() && request.columns[0].col_unique_id >= 0) {
@@ -1768,34 +1762,24 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
base_tablet_schema->append_column(TabletColumn(column));
}
}
+ // Use tablet schema directly from base tablet, they are the newest
schema, not contain
+ // dropped column during light weight schema change.
+ // But the tablet schema in base tablet maybe not the latest from FE, so
that if fe pass through
+ // a tablet schema, then use request schema.
+ size_t num_cols = request.columns.empty() ?
base_tablet->tablet_schema()->num_columns()
+ : request.columns.size();
+ return_columns.resize(num_cols);
+ for (int i = 0; i < num_cols; ++i) {
+ return_columns[i] = i;
+ }
// begin to find deltas to convert from base tablet to new tablet so that
// obtain base tablet and new tablet's push lock and header write lock to
prevent loading data
- RowsetReaderContext reader_context;
{
std::lock_guard<std::mutex>
base_tablet_lock(base_tablet->get_push_lock());
std::lock_guard<std::mutex>
new_tablet_lock(new_tablet->get_push_lock());
std::lock_guard<std::shared_mutex>
base_tablet_wlock(base_tablet->get_header_lock());
std::lock_guard<std::shared_mutex>
new_tablet_wlock(new_tablet->get_header_lock());
- // check if the tablet has alter task
- // if it has alter task, it means it is under old alter process
- size_t num_cols = base_tablet_schema->num_columns();
- return_columns.resize(num_cols);
- for (int i = 0; i < num_cols; ++i) {
- return_columns[i] = i;
- }
-
- // reader_context is stack variables, it's lifetime should keep the
same
- // with rs_readers
- reader_context.reader_type = READER_ALTER_TABLE;
- reader_context.tablet_schema = base_tablet_schema;
- reader_context.need_ordered_result = true;
- reader_context.delete_handler = &delete_handler;
- reader_context.return_columns = &return_columns;
- reader_context.sequence_id_idx =
reader_context.tablet_schema->sequence_col_idx();
- reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
- reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
- reader_context.is_vec = config::enable_vectorized_alter_table;
do {
RowsetSharedPtr max_rowset;
@@ -1866,30 +1850,30 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
res =
Status::OLAPInternalError(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS);
break;
}
-
- TabletReader::ReaderParams reader_params;
- reader_params.tablet = base_tablet;
- reader_params.reader_type = READER_ALTER_TABLE;
- reader_params.rs_readers = rs_readers;
- reader_params.tablet_schema = base_tablet_schema;
-
reader_params.return_columns.resize(base_tablet_schema->num_columns());
- std::iota(reader_params.return_columns.begin(),
reader_params.return_columns.end(), 0);
- reader_params.origin_return_columns =
&reader_params.return_columns;
- reader_params.version = {0, end_version};
- // BlockReader::init will call base_tablet->get_header_lock(), but
this lock we already get at outer layer, so we just call TabletReader::init
- RETURN_NOT_OK(reader.TabletReader::init(reader_params));
-
- res = delete_handler.init(base_tablet_schema,
base_tablet->delete_predicates(),
- end_version, &reader);
+ for (auto& delete_pred : base_tablet->delete_predicates()) {
+ if (delete_pred.version() > end_version) {
+ continue;
+ }
+
base_tablet_schema->merge_dropped_columns(base_tablet->tablet_schema(
+ Version(delete_pred.version(),
delete_pred.version())));
+ }
+ res = delete_handler.init(base_tablet, base_tablet_schema,
+ base_tablet->delete_predicates(),
end_version);
if (!res) {
LOG(WARNING) << "init delete handler failed. base_tablet="
<< base_tablet->full_name() << ", end_version="
<< end_version;
-
- // release delete handlers which have been inited successfully.
- delete_handler.finalize();
break;
}
+ reader_context.reader_type = READER_ALTER_TABLE;
+ reader_context.tablet_schema = base_tablet_schema;
+ reader_context.need_ordered_result = true;
+ reader_context.delete_handler = &delete_handler;
+ reader_context.return_columns = &return_columns;
+ reader_context.sequence_id_idx =
reader_context.tablet_schema->sequence_col_idx();
+ reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
+ reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
+ reader_context.is_vec = config::enable_vectorized_alter_table;
for (auto& rs_reader : rs_readers) {
res = rs_reader->init(&reader_context);
if (!res) {
@@ -2081,7 +2065,7 @@ Status
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
}
if (res = sc_procedure->process(rs_reader, rowset_writer.get(),
sc_params.new_tablet,
- sc_params.base_tablet);
+ sc_params.base_tablet_schema);
!res) {
LOG(WARNING) << "failed to process the version."
<< " version=" << rs_reader->version().first << "-"
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 36dfed53fa..45747a1400 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -41,14 +41,10 @@ public:
RowBlockChanger(TabletSchemaSPtr tablet_schema, const DeleteHandler*
delete_handler,
DescriptorTbl desc_tbl);
- RowBlockChanger(TabletSchemaSPtr tablet_schema, DescriptorTbl desc_tbl);
-
~RowBlockChanger();
ColumnMapping* get_mutable_column_mapping(size_t column_index);
- const SchemaMapping& get_schema_mapping() const { return _schema_mapping; }
-
Status change_row_block(const RowBlock* ref_block, int32_t data_version,
RowBlock* mutable_block, const uint64_t*
filtered_rows) const;
@@ -91,7 +87,7 @@ public:
virtual ~SchemaChange() = default;
virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter*
rowset_writer,
- TabletSharedPtr new_tablet, TabletSharedPtr
base_tablet) {
+ TabletSharedPtr new_tablet, TabletSchemaSPtr
base_tablet_schema) {
if (rowset_reader->rowset()->empty() ||
rowset_reader->rowset()->num_rows() == 0) {
RETURN_WITH_WARN_IF_ERROR(
rowset_writer->flush(),
@@ -105,7 +101,8 @@ public:
_filtered_rows = 0;
_merged_rows = 0;
- RETURN_IF_ERROR(_inner_process(rowset_reader, rowset_writer,
new_tablet, base_tablet));
+ RETURN_IF_ERROR(
+ _inner_process(rowset_reader, rowset_writer, new_tablet,
base_tablet_schema));
_add_filtered_rows(rowset_reader->filtered_rows());
// Check row num changes
@@ -129,7 +126,7 @@ protected:
void _add_merged_rows(uint64_t merged_rows) { _merged_rows += merged_rows;
}
virtual Status _inner_process(RowsetReaderSharedPtr rowset_reader,
RowsetWriter* rowset_writer,
- TabletSharedPtr new_tablet, TabletSharedPtr
base_tablet) {
+ TabletSharedPtr new_tablet, TabletSchemaSPtr
base_tablet_schema) {
return Status::NotSupported("inner process unsupported.");
};
@@ -157,7 +154,7 @@ public:
~LinkedSchemaChange() override = default;
Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter*
rowset_writer,
- TabletSharedPtr new_tablet, TabletSharedPtr base_tablet)
override;
+ TabletSharedPtr new_tablet, TabletSchemaSPtr
base_tablet_schema) override;
private:
const RowBlockChanger& _row_block_changer;
@@ -174,7 +171,7 @@ public:
private:
Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter*
rowset_writer,
- TabletSharedPtr new_tablet, TabletSharedPtr
base_tablet) override;
+ TabletSharedPtr new_tablet, TabletSchemaSPtr
base_tablet_schema) override;
const RowBlockChanger& _row_block_changer;
RowBlockAllocator* _row_block_allocator;
@@ -191,7 +188,7 @@ public:
private:
Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter*
rowset_writer,
- TabletSharedPtr new_tablet, TabletSharedPtr
base_tablet) override;
+ TabletSharedPtr new_tablet, TabletSchemaSPtr
base_tablet_schema) override;
const RowBlockChanger& _changer;
};
@@ -205,7 +202,7 @@ public:
private:
Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter*
rowset_writer,
- TabletSharedPtr new_tablet, TabletSharedPtr
base_tablet) override;
+ TabletSharedPtr new_tablet, TabletSchemaSPtr
base_tablet_schema) override;
bool _internal_sorting(const std::vector<RowBlock*>& row_block_arr,
const Version& temp_delta_versions, int64_t
oldest_write_timestamp,
@@ -230,7 +227,7 @@ public:
private:
Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter*
rowset_writer,
- TabletSharedPtr new_tablet, TabletSharedPtr
base_tablet) override;
+ TabletSharedPtr new_tablet, TabletSchemaSPtr
base_tablet_schema) override;
Status _internal_sorting(const
std::vector<std::unique_ptr<vectorized::Block>>& blocks,
const Version& temp_delta_versions, int64_t
oldest_write_timestamp,
@@ -278,14 +275,6 @@ public:
static bool tablet_in_converting(int64_t tablet_id);
private:
- // Check the status of schema change and clear information between "a
pair" of Schema change tables
- // Since A->B's schema_change information for A will be overwritten in
subsequent processing (no extra cleanup here)
- // Returns:
- // Success: If there is historical information, then clear it if there is
no problem; or no historical information
- // Failure: otherwise, if there is history information and it cannot be
emptied (version has not been completed)
- static Status _check_and_clear_schema_change_info(TabletSharedPtr tablet,
- const TAlterTabletReq&
request);
-
static Status _get_versions_to_be_changed(TabletSharedPtr base_tablet,
std::vector<Version>*
versions_to_be_changed,
RowsetSharedPtr* max_rowset);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index e038fe78ab..49ae2fd2ea 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -753,21 +753,10 @@ Status Tablet::capture_rs_readers(const
std::vector<Version>& version_path,
return Status::OK();
}
-void Tablet::add_delete_predicate(const DeletePredicatePB& delete_predicate,
int64_t version) {
- _tablet_meta->add_delete_predicate(delete_predicate, version);
-}
-
-// TODO(lingbin): what is the difference between
version_for_delete_predicate() and
-// version_for_load_deletion()? should at least leave a comment
bool Tablet::version_for_delete_predicate(const Version& version) {
return _tablet_meta->version_for_delete_predicate(version);
}
-bool Tablet::version_for_load_deletion(const Version& version) {
- RowsetSharedPtr rowset = _rs_version_map.at(version);
- return rowset->delete_flag();
-}
-
bool Tablet::can_do_compaction(size_t path_hash, CompactionType
compaction_type) {
if (compaction_type == CompactionType::BASE_COMPACTION && tablet_state()
!= TABLET_RUNNING) {
// base compaction can only be done for tablet in TABLET_RUNNING state.
@@ -1765,8 +1754,8 @@ Status Tablet::cooldown() {
if (!has_shutdown) {
modify_rowsets(to_add, to_delete);
if (new_rowset_meta->has_delete_predicate()) {
- add_delete_predicate(new_rowset_meta->delete_predicate(),
- new_rowset_meta->start_version());
+
_tablet_meta->add_delete_predicate(new_rowset_meta->delete_predicate(),
+
new_rowset_meta->start_version());
}
_self_owned_remote_rowsets.insert(to_add.front());
save_meta();
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 51cf455c58..65cee05302 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -159,9 +159,7 @@ public:
const std::vector<DeletePredicatePB>& delete_predicates() {
return _tablet_meta->delete_predicates();
}
- void add_delete_predicate(const DeletePredicatePB& delete_predicate,
int64_t version);
bool version_for_delete_predicate(const Version& version);
- bool version_for_load_deletion(const Version& version);
// meta lock
std::shared_mutex& get_header_lock() { return _meta_lock; }
@@ -287,6 +285,11 @@ public:
TabletSchemaSPtr tablet_schema() const override;
+ // Find the related rowset with specified version and return its tablet
schema
+ TabletSchemaSPtr tablet_schema(Version version) const {
+ return _tablet_meta->tablet_schema(version);
+ }
+
Status create_rowset_writer(const Version& version, const RowsetStatePB&
rowset_state,
const SegmentsOverlapPB& overlap,
TabletSchemaSPtr tablet_schema,
int64_t oldest_write_timestamp, int64_t
newest_write_timestamp,
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 1a2c429dcc..6e2d23ddc6 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -580,6 +580,19 @@ Version TabletMeta::max_version() const {
return max_version;
}
+// Find the rowset with specified version and return its schema
+// Currently, this API is used by delete condition
+const TabletSchemaSPtr TabletMeta::tablet_schema(Version version) const {
+ auto it = _rs_metas.begin();
+ while (it != _rs_metas.end()) {
+ if ((*it)->version() == version) {
+ return (*it)->tablet_schema();
+ }
+ ++it;
+ }
+ return nullptr;
+}
+
Status TabletMeta::add_rs_meta(const RowsetMetaSharedPtr& rs_meta) {
// check RowsetMeta is valid
for (auto& rs : _rs_metas) {
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 9cea34b248..d0f591e354 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -148,6 +148,8 @@ public:
TabletSchemaSPtr tablet_schema() const;
+ const TabletSchemaSPtr tablet_schema(Version version) const;
+
TabletSchema* mutable_tablet_schema();
const std::vector<RowsetMetaSharedPtr>& all_rs_metas() const;
@@ -232,7 +234,7 @@ private:
TabletState _tablet_state = TABLET_NOTREADY;
// the reference of _schema may use in tablet, so here need keep
// the lifetime of tablemeta and _schema is same with tablet
- std::shared_ptr<TabletSchema> _schema;
+ TabletSchemaSPtr _schema;
std::vector<RowsetMetaSharedPtr> _rs_metas;
// This variable _stale_rs_metas is used to record these rowsets‘ meta
which are be compacted.
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index e691cc2a4f..70c06adab9 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -458,14 +458,18 @@ vectorized::AggregateFunctionPtr
TabletColumn::get_aggregate_function(
agg_name, argument_types, {},
argument_types.back()->is_nullable());
}
-void TabletSchema::append_column(TabletColumn column) {
+void TabletSchema::append_column(TabletColumn column, bool is_dropped_column) {
if (column.is_key()) {
_num_key_columns++;
}
if (column.is_nullable()) {
_num_null_columns++;
}
- _field_name_to_index[column.name()] = _num_columns;
+ // The dropped column may have same name with exsiting column, so that
+ // not add to name to index map, only for uid to index map
+ if (!is_dropped_column) {
+ _field_name_to_index[column.name()] = _num_columns;
+ }
_field_id_to_index[column.unique_id()] = _num_columns;
_cols.push_back(std::move(column));
_num_columns++;
@@ -590,6 +594,34 @@ void TabletSchema::build_current_tablet_schema(int64_t
index_id, int32_t version
}
}
+void TabletSchema::merge_dropped_columns(TabletSchemaSPtr src_schema) {
+ // If they are the same tablet schema object, then just return
+ if (this == src_schema.get()) {
+ return;
+ }
+ for (const auto& src_col : src_schema->columns()) {
+ if (_field_id_to_index.find(src_col.unique_id()) ==
_field_id_to_index.end()) {
+ CHECK(!src_col.is_key()) << src_col.name() << " is key column,
should not be dropped.";
+ ColumnPB src_col_pb;
+ // There are some pointer in tablet column, not sure the reference
relation, so
+ // that deep copy it.
+ src_col.to_schema_pb(&src_col_pb);
+ TabletColumn new_col(src_col_pb);
+ append_column(new_col, /* is_dropped_column */ true);
+ }
+ }
+}
+
+// Dropped column is in _field_id_to_index but not in _field_name_to_index
+// Could refer to append_column method
+bool TabletSchema::is_dropped_column(const TabletColumn& col) const {
+ CHECK(_field_id_to_index.find(col.unique_id()) != _field_id_to_index.end())
+ << "could not find col with unique id = " << col.unique_id()
+ << " and name = " << col.name();
+ return _field_name_to_index.find(col.name()) == _field_name_to_index.end()
||
+ column(col.name()).unique_id() != col.unique_id();
+}
+
void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const {
tablet_schema_pb->set_keys_type(_keys_type);
for (auto& col : _cols) {
@@ -655,11 +687,13 @@ const TabletColumn& TabletSchema::column(size_t ordinal)
const {
return _cols[ordinal];
}
-void TabletSchema::init_field_index_for_test() {
- _field_name_to_index.clear();
- for (int i = 0; i < _cols.size(); ++i) {
- _field_name_to_index[_cols[i].name()] = i;
- }
+const TabletColumn& TabletSchema::column_by_uid(int32_t col_unique_id) const {
+ return _cols.at(_field_id_to_index.at(col_unique_id));
+}
+
+const TabletColumn& TabletSchema::column(const std::string& field_name) const {
+ const auto& found = _field_name_to_index.find(field_name);
+ return _cols[found->second];
}
vectorized::Block TabletSchema::create_block(
@@ -678,9 +712,12 @@ vectorized::Block TabletSchema::create_block(
return block;
}
-vectorized::Block TabletSchema::create_block() const {
+vectorized::Block TabletSchema::create_block(bool ignore_dropped_col) const {
vectorized::Block block;
for (const auto& col : _cols) {
+ if (ignore_dropped_col && is_dropped_column(col)) {
+ continue;
+ }
auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(col);
block.insert({data_type->create_column(), data_type, col.name()});
}
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 8271b2eafd..ce7a1171f8 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -125,7 +125,7 @@ public:
TabletSchema() = default;
void init_from_pb(const TabletSchemaPB& schema);
void to_schema_pb(TabletSchemaPB* tablet_meta_pb) const;
- void append_column(TabletColumn column);
+ void append_column(TabletColumn column, bool is_dropped_column = false);
void copy_from(const TabletSchema& tablet_schema);
std::string to_key() const;
uint32_t mem_size() const;
@@ -134,6 +134,8 @@ public:
int32_t field_index(const std::string& field_name) const;
int32_t field_index(int32_t col_unique_id) const;
const TabletColumn& column(size_t ordinal) const;
+ const TabletColumn& column(const std::string& field_name) const;
+ const TabletColumn& column_by_uid(int32_t col_unique_id) const;
const std::vector<TabletColumn>& columns() const;
size_t num_columns() const { return _num_columns; }
size_t num_key_columns() const { return _num_key_columns; }
@@ -164,16 +166,28 @@ public:
vectorized::Block create_block(
const std::vector<uint32_t>& return_columns,
const std::unordered_set<uint32_t>*
tablet_columns_need_convert_null = nullptr) const;
- vectorized::Block create_block() const;
+ vectorized::Block create_block(bool ignore_dropped_col = true) const;
void build_current_tablet_schema(int64_t index_id, int32_t version,
const POlapTableIndexSchema& index,
const TabletSchema& out_tablet_schema);
-private:
- // Only for unit test.
- void init_field_index_for_test();
+ // Merge columns that not exit in current schema, these column is dropped
in current schema
+ // but they are useful in some cases. For example,
+ // 1. origin schema is ColA, ColB
+ // 2. insert values 1, 2
+ // 3. delete where ColB = 2
+ // 4. drop ColB
+ // 5. insert values 3
+ // 6. add column ColB, although it is name ColB, but it is different with
previous ColB, the new ColB we name could call ColB'
+ // 7. insert value 4, 5
+ // Then the read schema should be ColA, ColB, ColB' because the delete
predicate need ColB to remove related data.
+ // Because they have same name, so that the dropped column should not be
added to the map, only with unique id.
+ void merge_dropped_columns(std::shared_ptr<TabletSchema> src_schema);
+
+ bool is_dropped_column(const TabletColumn& col) const;
+private:
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
friend bool operator!=(const TabletSchema& a, const TabletSchema& b);
diff --git a/be/src/olap/tuple_reader.cpp b/be/src/olap/tuple_reader.cpp
index b9bd33fff1..146249fb79 100644
--- a/be/src/olap/tuple_reader.cpp
+++ b/be/src/olap/tuple_reader.cpp
@@ -66,7 +66,7 @@ Status TupleReader::_init_collect_iter(const ReaderParams&
read_params,
}
Status TupleReader::init(const ReaderParams& read_params) {
- TabletReader::init(read_params);
+ RETURN_NOT_OK(TabletReader::init(read_params));
std::vector<RowsetReaderSharedPtr> rs_readers;
auto status = _init_collect_iter(read_params, &rs_readers);
diff --git a/be/src/vec/exec/volap_scanner.cpp
b/be/src/vec/exec/volap_scanner.cpp
index 7a349ba26d..835a3dd59b 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -197,6 +197,12 @@ Status VOlapScanner::_init_tablet_reader_params(
std::inserter(_tablet_reader_params.delete_predicates,
_tablet_reader_params.delete_predicates.begin()));
+ // Merge the columns in delete predicate that not in latest schema in to
current tablet schema
+ for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) {
+ _tablet_schema->merge_dropped_columns(
+ _tablet->tablet_schema(Version(del_pred_pb.version(),
del_pred_pb.version())));
+ }
+
// Range
for (auto key_range : key_ranges) {
if (key_range->begin_scan_range.size() == 1 &&
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index ece88b97f8..41b54cea4d 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -105,7 +105,7 @@ void BlockReader::_init_agg_state(const ReaderParams&
read_params) {
}
Status BlockReader::init(const ReaderParams& read_params) {
- TabletReader::init(read_params);
+ RETURN_NOT_OK(TabletReader::init(read_params));
int32_t return_column_size = 0;
// read sequence column if not reader_query
diff --git a/be/test/olap/delete_handler_test.cpp
b/be/test/olap/delete_handler_test.cpp
index db90b68d3f..c0582bdf06 100644
--- a/be/test/olap/delete_handler_test.cpp
+++ b/be/test/olap/delete_handler_test.cpp
@@ -30,6 +30,7 @@
#include "olap/olap_define.h"
#include "olap/options.h"
#include "olap/push_handler.h"
+#include "olap/rowset/beta_rowset.h"
#include "olap/storage_engine.h"
#include "olap/utils.h"
#include "util/cpu_info.h"
@@ -813,6 +814,54 @@ protected:
_data_row_cursor.init(tablet->tablet_schema());
_data_row_cursor.allocate_memory_for_string_type(tablet->tablet_schema());
+ _json_rowset_meta = R"({
+ "rowset_id": 540081,
+ "tablet_id": 15673,
+ "txn_id": 4042,
+ "tablet_schema_hash": 567997577,
+ "rowset_type": "BETA_ROWSET",
+ "rowset_state": "VISIBLE",
+ "start_version": 2,
+ "end_version": 2,
+ "num_rows": 3929,
+ "total_disk_size": 84699,
+ "data_disk_size": 84464,
+ "index_disk_size": 235,
+ "empty": false,
+ "load_id": {
+ "hi": -5350970832824939812,
+ "lo": -6717994719194512122
+ },
+ "creation_time": 1553765670,
+ "alpha_rowset_extra_meta_pb": {
+ "segment_groups": [
+ {
+ "segment_group_id": 0,
+ "num_segments": 2,
+ "index_size": 132,
+ "data_size": 576,
+ "num_rows": 5,
+ "zone_maps": [
+ {
+ "min": "MQ==",
+ "max": "NQ==",
+ "null_flag": false
+ },
+ {
+ "min": "MQ==",
+ "max": "Mw==",
+ "null_flag": false
+ },
+ {
+ "min": "J2J1c2gn",
+ "max": "J3RvbSc=",
+ "null_flag": false
+ }
+ ],
+ "empty": false
+ }]
+ }
+ })";
}
void TearDown() {
@@ -824,11 +873,31 @@ protected:
EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
}
+ void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end) {
+ pb1->init_from_json(_json_rowset_meta);
+ pb1->set_start_version(start);
+ pb1->set_end_version(end);
+ pb1->set_creation_time(10000);
+ }
+
+ void add_delete_predicate(DeletePredicatePB& del_pred, int64_t version) {
+ RowsetMetaSharedPtr rsm(new RowsetMeta());
+ init_rs_meta(rsm, version, version);
+ RowsetId id;
+ id.init(version * 1000);
+ rsm->set_rowset_id(id);
+ rsm->set_delete_predicate(del_pred);
+ rsm->set_tablet_schema(tablet->tablet_schema());
+ RowsetSharedPtr rowset =
std::make_shared<BetaRowset>(tablet->tablet_schema(), "", rsm);
+ tablet->add_rowset(rowset);
+ }
+
std::string _tablet_path;
RowCursor _data_row_cursor;
TabletSharedPtr tablet;
TCreateTabletReq _create_tablet;
DeleteHandler _delete_handler;
+ std::string _json_rowset_meta;
};
TEST_F(TestDeleteHandler, InitSuccess) {
@@ -858,7 +927,7 @@ TEST_F(TestDeleteHandler, InitSuccess) {
DeletePredicatePB del_pred;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions, &del_pred);
EXPECT_EQ(Status::OK(), res);
- tablet->add_delete_predicate(del_pred, 1);
+ add_delete_predicate(del_pred, 2);
conditions.clear();
condition.column_name = "k1";
@@ -871,7 +940,7 @@ TEST_F(TestDeleteHandler, InitSuccess) {
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_2);
EXPECT_EQ(Status::OK(), res);
- tablet->add_delete_predicate(del_pred_2, 2);
+ add_delete_predicate(del_pred_2, 3);
conditions.clear();
condition.column_name = "k2";
@@ -884,7 +953,7 @@ TEST_F(TestDeleteHandler, InitSuccess) {
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_3);
EXPECT_EQ(Status::OK(), res);
- tablet->add_delete_predicate(del_pred_3, 3);
+ add_delete_predicate(del_pred_3, 4);
conditions.clear();
condition.column_name = "k2";
@@ -897,19 +966,19 @@ TEST_F(TestDeleteHandler, InitSuccess) {
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_4);
EXPECT_EQ(Status::OK(), res);
- tablet->add_delete_predicate(del_pred_4, 4);
+ add_delete_predicate(del_pred_4, 5);
- // 从header文件中取出版本号小于等于7的过滤条件
- res = _delete_handler.init(tablet->tablet_schema(),
tablet->delete_predicates(), 4);
+ // Get delete conditions which version <= 5
+ res = _delete_handler.init(tablet, tablet->tablet_schema(),
tablet->delete_predicates(), 5);
EXPECT_EQ(Status::OK(), res);
EXPECT_EQ(4, _delete_handler.conditions_num());
std::vector<int64_t> conds_version = _delete_handler.get_conds_version();
EXPECT_EQ(4, conds_version.size());
sort(conds_version.begin(), conds_version.end());
- EXPECT_EQ(1, conds_version[0]);
- EXPECT_EQ(2, conds_version[1]);
- EXPECT_EQ(3, conds_version[2]);
- EXPECT_EQ(4, conds_version[3]);
+ EXPECT_EQ(2, conds_version[0]);
+ EXPECT_EQ(3, conds_version[1]);
+ EXPECT_EQ(4, conds_version[2]);
+ EXPECT_EQ(5, conds_version[3]);
_delete_handler.finalize();
}
@@ -938,10 +1007,10 @@ TEST_F(TestDeleteHandler, FilterDataSubconditions) {
DeletePredicatePB del_pred;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions, &del_pred);
EXPECT_EQ(Status::OK(), res);
- tablet->add_delete_predicate(del_pred, 1);
+ add_delete_predicate(del_pred, 2);
// 指定版本号为10以载入Header中的所有过滤条件(在这个case中,只有过滤条件1)
- res = _delete_handler.init(tablet->tablet_schema(),
tablet->delete_predicates(), 4);
+ res = _delete_handler.init(tablet, tablet->tablet_schema(),
tablet->delete_predicates(), 4);
EXPECT_EQ(Status::OK(), res);
EXPECT_EQ(1, _delete_handler.conditions_num());
@@ -995,7 +1064,7 @@ TEST_F(TestDeleteHandler, FilterDataConditions) {
DeletePredicatePB del_pred;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions, &del_pred);
EXPECT_EQ(Status::OK(), res);
- tablet->add_delete_predicate(del_pred, 1);
+ add_delete_predicate(del_pred, 2);
// 过滤条件2
conditions.clear();
@@ -1009,7 +1078,7 @@ TEST_F(TestDeleteHandler, FilterDataConditions) {
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_2);
EXPECT_EQ(Status::OK(), res);
- tablet->add_delete_predicate(del_pred_2, 2);
+ add_delete_predicate(del_pred_2, 3);
// 过滤条件3
conditions.clear();
@@ -1023,10 +1092,10 @@ TEST_F(TestDeleteHandler, FilterDataConditions) {
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_3);
EXPECT_EQ(Status::OK(), res);
- tablet->add_delete_predicate(del_pred_3, 3);
+ add_delete_predicate(del_pred_3, 4);
// 指定版本号为4以载入meta中的所有过滤条件(在这个case中,只有过滤条件1)
- res = _delete_handler.init(tablet->tablet_schema(),
tablet->delete_predicates(), 4);
+ res = _delete_handler.init(tablet, tablet->tablet_schema(),
tablet->delete_predicates(), 4);
EXPECT_EQ(Status::OK(), res);
EXPECT_EQ(3, _delete_handler.conditions_num());
@@ -1072,7 +1141,7 @@ TEST_F(TestDeleteHandler, FilterDataVersion) {
DeletePredicatePB del_pred;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions, &del_pred);
EXPECT_EQ(Status::OK(), res);
- tablet->add_delete_predicate(del_pred, 3);
+ add_delete_predicate(del_pred, 3);
// 过滤条件2
conditions.clear();
@@ -1086,10 +1155,10 @@ TEST_F(TestDeleteHandler, FilterDataVersion) {
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_2);
EXPECT_EQ(Status::OK(), res);
- tablet->add_delete_predicate(del_pred_2, 4);
+ add_delete_predicate(del_pred_2, 4);
// 指定版本号为4以载入meta中的所有过滤条件(过滤条件1,过滤条件2)
- res = _delete_handler.init(tablet->tablet_schema(),
tablet->delete_predicates(), 4);
+ res = _delete_handler.init(tablet, tablet->tablet_schema(),
tablet->delete_predicates(), 4);
EXPECT_EQ(Status::OK(), res);
EXPECT_EQ(2, _delete_handler.conditions_num());
diff --git a/be/test/olap/rowid_conversion_test.cpp
b/be/test/olap/rowid_conversion_test.cpp
index d0fc20785d..d105223f55 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -24,6 +24,7 @@
#include "olap/delete_handler.h"
#include "olap/merger.h"
#include "olap/row_cursor.h"
+#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_reader.h"
@@ -197,6 +198,74 @@ protected:
return rowset;
}
+ void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end) {
+ std::string json_rowset_meta = R"({
+ "rowset_id": 540081,
+ "tablet_id": 15673,
+ "txn_id": 4042,
+ "tablet_schema_hash": 567997577,
+ "rowset_type": "BETA_ROWSET",
+ "rowset_state": "VISIBLE",
+ "start_version": 2,
+ "end_version": 2,
+ "num_rows": 3929,
+ "total_disk_size": 84699,
+ "data_disk_size": 84464,
+ "index_disk_size": 235,
+ "empty": false,
+ "load_id": {
+ "hi": -5350970832824939812,
+ "lo": -6717994719194512122
+ },
+ "creation_time": 1553765670,
+ "alpha_rowset_extra_meta_pb": {
+ "segment_groups": [
+ {
+ "segment_group_id": 0,
+ "num_segments": 2,
+ "index_size": 132,
+ "data_size": 576,
+ "num_rows": 5,
+ "zone_maps": [
+ {
+ "min": "MQ==",
+ "max": "NQ==",
+ "null_flag": false
+ },
+ {
+ "min": "MQ==",
+ "max": "Mw==",
+ "null_flag": false
+ },
+ {
+ "min": "J2J1c2gn",
+ "max": "J3RvbSc=",
+ "null_flag": false
+ }
+ ],
+ "empty": false
+ }]
+ }
+ })";
+ pb1->init_from_json(json_rowset_meta);
+ pb1->set_start_version(start);
+ pb1->set_end_version(end);
+ pb1->set_creation_time(10000);
+ }
+
+ void add_delete_predicate(TabletSharedPtr tablet, DeletePredicatePB&
del_pred,
+ int64_t version) {
+ RowsetMetaSharedPtr rsm(new RowsetMeta());
+ init_rs_meta(rsm, version, version);
+ RowsetId id;
+ id.init(version * 1000);
+ rsm->set_rowset_id(id);
+ rsm->set_delete_predicate(del_pred);
+ rsm->set_tablet_schema(tablet->tablet_schema());
+ RowsetSharedPtr rowset =
std::make_shared<BetaRowset>(tablet->tablet_schema(), "", rsm);
+ tablet->add_rowset(rowset);
+ }
+
TabletSharedPtr create_tablet(const TabletSchema& tablet_schema,
bool enable_unique_key_merge_on_write,
int64_t version,
bool has_delete_handler) {
@@ -209,7 +278,7 @@ protected:
col.__set_column_name(column.name());
col.__set_is_key(column.is_key());
cols.push_back(col);
- col_ordinal_to_unique_id[i] = i;
+ col_ordinal_to_unique_id[i] = column.unique_id();
}
TTabletSchema t_tablet_schema;
@@ -226,6 +295,9 @@ protected:
new TabletMeta(1, 1, 1, 1, 1, 1, t_tablet_schema, 1,
col_ordinal_to_unique_id,
UniqueId(1, 2), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F, "",
enable_unique_key_merge_on_write));
+
+ TabletSharedPtr tablet(new Tablet(tablet_meta, nullptr));
+ tablet->init();
if (has_delete_handler) {
// delete data with key < 1000
std::vector<TCondition> conditions;
@@ -240,10 +312,8 @@ protected:
Status st =
DeleteHandler::generate_delete_predicate(tablet_schema,
conditions, &del_pred);
EXPECT_EQ(Status::OK(), st);
- tablet_meta->add_delete_predicate(del_pred, version);
+ add_delete_predicate(tablet, del_pred, version);
}
-
- TabletSharedPtr tablet(new Tablet(tablet_meta, nullptr));
return tablet;
}
diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp
b/be/test/olap/rowset/segment_v2/segment_test.cpp
index 105a9f5c3c..d75f6c44f7 100644
--- a/be/test/olap/rowset/segment_v2/segment_test.cpp
+++ b/be/test/olap/rowset/segment_v2/segment_test.cpp
@@ -104,19 +104,13 @@ protected:
TabletSchemaSPtr create_schema(const std::vector<TabletColumn>& columns,
KeysType keys_type = DUP_KEYS, int
num_custom_key_columns = -1) {
TabletSchemaSPtr res = std::make_shared<TabletSchema>();
- int num_key_columns = 0;
+
for (auto& col : columns) {
- if (col.is_key()) {
- num_key_columns++;
- }
- res->_cols.push_back(col);
+ res->append_column(col);
}
- res->_num_columns = columns.size();
- res->_num_key_columns = num_key_columns;
res->_num_short_key_columns =
- num_custom_key_columns != -1 ? num_custom_key_columns :
num_key_columns;
+ num_custom_key_columns != -1 ? num_custom_key_columns :
res->num_key_columns();
res->_keys_type = keys_type;
- res->init_field_index_for_test();
return res;
}
@@ -571,8 +565,9 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
condition.__set_condition_op("<");
std::vector<std::string> vals = {"2"};
condition.__set_condition_values(vals);
- std::shared_ptr<Conditions> conditions(new Conditions());
- conditions->set_tablet_schema(tablet_schema);
+ std::shared_ptr<Conditions> conditions(new
Conditions(tablet_schema));
+ condition.__set_column_unique_id(
+ tablet_schema->column(condition.column_name).unique_id());
EXPECT_EQ(Status::OK(), conditions->append_condition(condition));
StorageReadOptions read_opts;
@@ -595,8 +590,9 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
condition.__set_condition_op("<");
std::vector<std::string> vals = {"100"};
condition.__set_condition_values(vals);
- std::shared_ptr<Conditions> conditions(new Conditions());
- conditions->set_tablet_schema(tablet_schema);
+ std::shared_ptr<Conditions> conditions(new
Conditions(tablet_schema));
+ condition.__set_column_unique_id(
+ tablet_schema->column(condition.column_name).unique_id());
EXPECT_EQ(Status::OK(), conditions->append_condition(condition));
StorageReadOptions read_opts;
@@ -645,8 +641,9 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
condition.__set_condition_op("<");
std::vector<std::string> vals = {"165000"};
condition.__set_condition_values(vals);
- std::shared_ptr<Conditions> conditions(new Conditions());
- conditions->set_tablet_schema(tablet_schema);
+ std::shared_ptr<Conditions> conditions(new
Conditions(tablet_schema));
+ condition.__set_column_unique_id(
+ tablet_schema->column(condition.column_name).unique_id());
EXPECT_EQ(Status::OK(), conditions->append_condition(condition));
// the second page read will be pruned by the following delete
predicate
@@ -655,8 +652,9 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
delete_condition.__set_condition_op("=");
std::vector<std::string> vals2 = {"164001"};
delete_condition.__set_condition_values(vals2);
- std::shared_ptr<Conditions> delete_conditions(new Conditions());
- delete_conditions->set_tablet_schema(tablet_schema);
+ std::shared_ptr<Conditions> delete_conditions(new
Conditions(tablet_schema));
+ delete_condition.__set_column_unique_id(
+
tablet_schema->column(delete_condition.column_name).unique_id());
EXPECT_EQ(Status::OK(),
delete_conditions->append_condition(delete_condition));
StorageReadOptions read_opts;
@@ -710,8 +708,11 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
// 102 is not in page 1
std::vector<std::string> vals = {"102"};
condition.__set_condition_values(vals);
- std::shared_ptr<Conditions> conditions(new Conditions());
- conditions->set_tablet_schema(tablet_schema);
+ std::shared_ptr<Conditions> conditions(new
Conditions(tablet_schema));
+ condition.__set_column_unique_id(
+ tablet_schema->column(condition.column_name).unique_id());
+ condition.__set_column_unique_id(
+ tablet_schema->column(condition.column_name).unique_id());
EXPECT_EQ(Status::OK(), conditions->append_condition(condition));
read_opts.conditions = conditions.get();
std::unique_ptr<RowwiseIterator> iter;
@@ -895,15 +896,12 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
MemPool pool;
std::shared_ptr<TabletSchema> tablet_schema(new TabletSchema());
- tablet_schema->_num_columns = 4;
- tablet_schema->_num_key_columns = 3;
tablet_schema->_num_short_key_columns = 2;
tablet_schema->_num_rows_per_row_block = num_rows_per_block;
- tablet_schema->_cols.push_back(create_char_key(1));
- tablet_schema->_cols.push_back(create_char_key(2));
- tablet_schema->_cols.push_back(create_varchar_key(3));
- tablet_schema->_cols.push_back(create_varchar_key(4));
- tablet_schema->init_field_index_for_test();
+ tablet_schema->append_column(create_char_key(1));
+ tablet_schema->append_column(create_char_key(2));
+ tablet_schema->append_column(create_varchar_key(3));
+ tablet_schema->append_column(create_varchar_key(4));
SegmentWriterOptions opts;
opts.num_rows_per_block = num_rows_per_block;
@@ -1061,8 +1059,9 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
condition.__set_condition_op(">");
std::vector<std::string> vals = {"100"};
condition.__set_condition_values(vals);
- std::shared_ptr<Conditions> conditions(new Conditions());
- conditions->set_tablet_schema(tablet_schema);
+ std::shared_ptr<Conditions> conditions(new
Conditions(tablet_schema));
+ condition.__set_column_unique_id(
+ tablet_schema->column(condition.column_name).unique_id());
EXPECT_EQ(Status::OK(), conditions->append_condition(condition));
StorageReadOptions read_opts;
@@ -1119,8 +1118,9 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
condition.__set_condition_op("<");
std::vector<std::string> vals = {"-2"};
condition.__set_condition_values(vals);
- std::shared_ptr<Conditions> conditions(new Conditions());
- conditions->set_tablet_schema(tablet_schema);
+ std::shared_ptr<Conditions> conditions(new
Conditions(tablet_schema));
+ condition.__set_column_unique_id(
+ tablet_schema->column(condition.column_name).unique_id());
EXPECT_EQ(Status::OK(), conditions->append_condition(condition));
StorageReadOptions read_opts;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 605e335153..72b9457a3f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -443,7 +443,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
long originIdxId = indexIdMap.get(shadowIdxId);
int shadowSchemaHash =
indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
int originSchemaHash =
tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId));
- List<Column> originSchemaColumns =
tbl.getSchemaByIndexId(originIdxId);
+ List<Column> originSchemaColumns =
tbl.getSchemaByIndexId(originIdxId, true);
for (Tablet shadowTablet : shadowIdx.getTablets()) {
long shadowTabletId = shadowTablet.getId();
long originTabletId =
partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index b9ea800554..94f61376d4 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -505,6 +505,9 @@ struct TCondition {
1: required string column_name
2: required string condition_op
3: required list<string> condition_values
+ // In delete condition, the different column may have same column name,
need
+ // using unique id to distinguish them
+ 4: optional i32 column_unique_id
}
struct TExportStatusResult {
diff --git
a/regression-test/data/schema_change/test_alter_table_column_with_delete_drop_column_dup_key.out
b/regression-test/data/schema_change/test_alter_table_column_with_delete_drop_column_dup_key.out
new file mode 100644
index 0000000000..942fcecf2e
--- /dev/null
+++
b/regression-test/data/schema_change/test_alter_table_column_with_delete_drop_column_dup_key.out
@@ -0,0 +1,133 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 1 1 1
+3 3 3 3
+4 4 4 4
+
+-- !sql --
+1 1 1 1
+3 3 3 3
+4 4 4 4
+
+-- !sql --
+1 1 1
+3 3 3
+4 4 4
+
+-- !sql --
+1 1 1
+3 3 3
+4 4 4
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+5 5 5 B
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+5 5 5 B
+
+-- !sql --
+3 3 3 3
+
+-- !sql --
+3 3 3 3
+
+-- !sql --
+3 3 3
+
+-- !sql --
+3 3 3
+
+-- !sql --
+4 4 4 A
+
+-- !sql --
+4 4 4 A
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+5 5 5 B
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+5 5 5 B
+
+-- !sql --
+4 A 4 4 A
+
+-- !sql --
+4 A 4 4 A
+
+-- !sql --
+1 1 1 1
+3 3 3 3
+4 4 4 4
+
+-- !sql --
+1 1 1 1
+3 3 3 3
+4 4 4 4
+
+-- !sql --
+1 1 1
+3 3 3
+4 4 4
+
+-- !sql --
+1 1 1
+3 3 3
+4 4 4
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+5 5 5 B
+5 5 5 B
+5 5 5 B
+5 5 5 B
+5 5 5 B
+5 5 5 B
+5 5 5 B
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+5 5 5 B
+5 5 5 B
+5 5 5 B
+5 5 5 B
+5 5 5 B
+5 5 5 B
+5 5 5 B
+
diff --git
a/regression-test/data/schema_change/test_alter_table_column_with_delete_drop_column_unique_key.out
b/regression-test/data/schema_change/test_alter_table_column_with_delete_drop_column_unique_key.out
new file mode 100644
index 0000000000..9aa7fdd9db
--- /dev/null
+++
b/regression-test/data/schema_change/test_alter_table_column_with_delete_drop_column_unique_key.out
@@ -0,0 +1,79 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 1 1 2
+3 3 3 3
+4 4 4 4
+
+-- !sql --
+1 1 1 2
+3 3 3 3
+4 4 4 4
+
+-- !sql --
+1 1 1
+3 3 3
+4 4 4
+
+-- !sql --
+1 1 1
+3 3 3
+4 4 4
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+5 5 5 B
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+5 5 5 B
+
+-- !sql --
+3 3 3 3
+
+-- !sql --
+3 3 3 3
+
+-- !sql --
+3 3 3
+
+-- !sql --
+3 3 3
+
+-- !sql --
+4 4 4 A
+
+-- !sql --
+4 4 4 A
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+5 5 5 B
+
+-- !sql --
+1 1 1 A
+3 3 3 A
+4 4 4 A
+5 5 5 B
+
+-- !sql --
+4 A 4 4 A
+
+-- !sql --
+4 A 4 4 A
+
diff --git
a/regression-test/suites/schema_change/test_alter_table_column_with_delete_drop_column_dup_key.groovy
b/regression-test/suites/schema_change/test_alter_table_column_with_delete_drop_column_dup_key.groovy
new file mode 100644
index 0000000000..5d2da84b02
--- /dev/null
+++
b/regression-test/suites/schema_change/test_alter_table_column_with_delete_drop_column_dup_key.groovy
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_alter_table_column_with_delete_drop_column_dup_key",
"schema_change") {
+ def tbName1 = "alter_table_column_dup_with_delete_drop_column_dup_key"
+ def getJobState = { tableName ->
+ def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE
IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
+ return jobStateResult[0][9]
+ }
+
+//=========================Test Normal Schema Change
+ sql "DROP TABLE IF EXISTS ${tbName1}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName1} (
+ k1 INT,
+ value1 INT,
+ value2 INT,
+ value3 INT
+ )
+ DUPLICATE KEY (k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1 properties("replication_num" =
"1", "light_schema_change" = "false", "disable_auto_compaction" = "true");
+ """
+ // delete value3 = 2
+ sql "insert into ${tbName1} values(1,1,1,1);"
+ sql "insert into ${tbName1} values(2,2,2,2);"
+ sql "delete from ${tbName1} where value3 = 2;"
+ sql "insert into ${tbName1} values(3,3,3,3);"
+ sql "insert into ${tbName1} values(4,4,4,4);"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} order by k1;"
+
+ // drop value3
+ sql """
+ ALTER TABLE ${tbName1}
+ DROP COLUMN value3;
+ """
+ int max_try_secs = 1200
+ while (max_try_secs--) {
+ String res = getJobState(tbName1)
+ if (res == "FINISHED") {
+ break
+ } else {
+ Thread.sleep(100)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} order by k1;"
+
+ // drop value3
+ sql """
+ ALTER TABLE ${tbName1}
+ ADD COLUMN value3 CHAR(100) DEFAULT 'A';
+ """
+ max_try_secs = 1200
+ while (max_try_secs--) {
+ String res = getJobState(tbName1)
+ if (res == "FINISHED") {
+ break
+ } else {
+ Thread.sleep(100)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} order by k1;"
+
+ sql "insert into ${tbName1} values(5,5,5,'B');"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} order by k1;"
+ sql "DROP TABLE ${tbName1} FORCE;"
+
+//======================= Test Light Weight Schema Change
+ sql "DROP TABLE IF EXISTS ${tbName1}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName1} (
+ k1 INT,
+ value1 INT,
+ value2 INT,
+ value3 INT
+ )
+ DUPLICATE KEY (k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1 properties("replication_num" =
"1", "light_schema_change" = "true", "disable_auto_compaction" = "true");
+ """
+ // delete value3 = 2
+ sql "insert into ${tbName1} values(1,1,1,1);"
+ sql "insert into ${tbName1} values(2,2,2,2);"
+ sql "delete from ${tbName1} where value3 = 2;"
+ sql "insert into ${tbName1} values(3,3,3,3);"
+ sql "insert into ${tbName1} values(4,4,4,4);"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} where value3=3 order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} where value3=3 order by k1;"
+
+ // drop value3
+ sql """
+ ALTER TABLE ${tbName1}
+ DROP COLUMN value3;
+ """
+ max_try_secs = 1200
+ while (max_try_secs--) {
+ String res = getJobState(tbName1)
+ if (res == "FINISHED") {
+ break
+ } else {
+ Thread.sleep(100)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} where value1=3 order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} where value1=3 order by k1;"
+
+ // drop value3
+ sql """
+ ALTER TABLE ${tbName1}
+ ADD COLUMN value3 CHAR(100) DEFAULT 'A';
+ """
+ max_try_secs = 1200
+ while (max_try_secs--) {
+ String res = getJobState(tbName1)
+ if (res == "FINISHED") {
+ break
+ } else {
+ Thread.sleep(100)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} where value1=4 order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} where value1=4 order by k1;"
+
+ sql "insert into ${tbName1} values(5,5,5,'B');"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} order by k1;"
+
+ // Do schema change that not do light weight schema change
+ sql """
+ ALTER TABLE ${tbName1}
+ ADD COLUMN k2 CHAR(10) KEY DEFAULT 'A';
+ """
+ max_try_secs = 1200
+ while (max_try_secs--) {
+ String res = getJobState(tbName1)
+ if (res == "FINISHED") {
+ break
+ } else {
+ Thread.sleep(100)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} where value1=4 order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} where value1=4 order by k1;"
+ sql "DROP TABLE ${tbName1} FORCE;"
+
+//======================= Test Light Weight Schema Change with Compaction
+ sql "DROP TABLE IF EXISTS ${tbName1}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName1} (
+ k1 INT,
+ value1 INT,
+ value2 INT,
+ value3 INT
+ )
+ DUPLICATE KEY (k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1 properties("replication_num" =
"1", "light_schema_change" = "true", "disable_auto_compaction" = "false");
+ """
+ // delete value3 = 2
+ sql "insert into ${tbName1} values(1,1,1,1);"
+ sql "insert into ${tbName1} values(2,2,2,2);"
+ sql "delete from ${tbName1} where value3 = 2;"
+ sql "insert into ${tbName1} values(3,3,3,3);"
+ sql "insert into ${tbName1} values(4,4,4,4);"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} order by k1;"
+
+ // drop value3
+ sql """
+ ALTER TABLE ${tbName1}
+ DROP COLUMN value3;
+ """
+ max_try_secs = 1200
+ while (max_try_secs--) {
+ String res = getJobState(tbName1)
+ if (res == "FINISHED") {
+ break
+ } else {
+ Thread.sleep(100)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} order by k1;"
+
+ // drop value3
+ sql """
+ ALTER TABLE ${tbName1}
+ ADD COLUMN value3 CHAR(100) DEFAULT 'A';
+ """
+ max_try_secs = 1200
+ while (max_try_secs--) {
+ String res = getJobState(tbName1)
+ if (res == "FINISHED") {
+ break
+ } else {
+ Thread.sleep(100)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} order by k1;"
+
+ sql "insert into ${tbName1} values(5,5,5,'B');"
+ sql "insert into ${tbName1} values(5,5,5,'B');"
+ sql "insert into ${tbName1} values(5,5,5,'B');"
+ sql "insert into ${tbName1} values(5,5,5,'B');"
+ sql "insert into ${tbName1} values(5,5,5,'B');"
+ sql "insert into ${tbName1} values(5,5,5,'B');"
+ sql "insert into ${tbName1} values(5,5,5,'B');"
+
+ Thread.sleep(5000)
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} order by k1;"
+ sql "DROP TABLE ${tbName1} FORCE;"
+
+}
diff --git
a/regression-test/suites/schema_change/test_alter_table_column_with_delete_drop_column_unique_key.groovy
b/regression-test/suites/schema_change/test_alter_table_column_with_delete_drop_column_unique_key.groovy
new file mode 100644
index 0000000000..88e13ea610
--- /dev/null
+++
b/regression-test/suites/schema_change/test_alter_table_column_with_delete_drop_column_unique_key.groovy
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_alter_table_column_with_delete_drop_column_unique_key",
"schema_change") {
+ def tbName1 = "alter_table_column_dup_with_delete_drop_column_unique_key"
+ def getJobState = { tableName ->
+ def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE
IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
+ return jobStateResult[0][9]
+ }
+
+//=========================Test Normal Schema Change
+ sql "DROP TABLE IF EXISTS ${tbName1}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName1} (
+ k1 INT,
+ value1 INT,
+ value2 INT,
+ value3 INT
+ )
+ UNIQUE KEY (k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1 properties("replication_num" =
"1", "light_schema_change" = "false", "disable_auto_compaction" = "true");
+ """
+ // delete k1 = 2
+ sql "insert into ${tbName1} values(1,1,1,1);"
+ sql "insert into ${tbName1} values(1,1,1,2);"
+ sql "insert into ${tbName1} values(2,2,2,2);"
+ sql "delete from ${tbName1} where k1 = 2;"
+ sql "insert into ${tbName1} values(3,3,3,3);"
+ sql "insert into ${tbName1} values(4,4,4,4);"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} order by k1;"
+
+ // drop value3
+ sql """
+ ALTER TABLE ${tbName1}
+ DROP COLUMN value3;
+ """
+ int max_try_secs = 1200
+ while (max_try_secs--) {
+ String res = getJobState(tbName1)
+ if (res == "FINISHED") {
+ break
+ } else {
+ Thread.sleep(100)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} order by k1;"
+
+ // drop value3
+ sql """
+ ALTER TABLE ${tbName1}
+ ADD COLUMN value3 CHAR(100) DEFAULT 'A';
+ """
+ max_try_secs = 1200
+ while (max_try_secs--) {
+ String res = getJobState(tbName1)
+ if (res == "FINISHED") {
+ break
+ } else {
+ Thread.sleep(100)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} order by k1;"
+
+ sql "insert into ${tbName1} values(5,5,5,'B');"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} order by k1;"
+ sql "DROP TABLE ${tbName1} FORCE;"
+
+//======================= Test Light Weight Schema Change
+ sql "DROP TABLE IF EXISTS ${tbName1}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName1} (
+ k1 INT,
+ k2 INT,
+ value1 INT,
+ value2 INT
+ )
+ UNIQUE KEY (k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1 properties("replication_num" =
"1", "light_schema_change" = "true", "disable_auto_compaction" = "true");
+ """
+ // delete value3 = 2
+ sql "insert into ${tbName1} values(1,1,1,1);"
+ sql "insert into ${tbName1} values(2,2,2,2);"
+ sql "delete from ${tbName1} where k1 = 2;"
+ sql "insert into ${tbName1} values(3,3,3,3);"
+ sql "insert into ${tbName1} values(4,4,4,4);"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} where value2=3 order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} where value2=3 order by k1;"
+
+ // drop value3
+ sql """
+ ALTER TABLE ${tbName1}
+ DROP COLUMN k2;
+ """
+ max_try_secs = 1200
+ while (max_try_secs--) {
+ String res = getJobState(tbName1)
+ if (res == "FINISHED") {
+ break
+ } else {
+ Thread.sleep(100)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} where value1=3 order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} where value1=3 order by k1;"
+
+ // drop value3
+ sql """
+ ALTER TABLE ${tbName1}
+ ADD COLUMN value3 CHAR(100) DEFAULT 'A';
+ """
+ max_try_secs = 1200
+ while (max_try_secs--) {
+ String res = getJobState(tbName1)
+ if (res == "FINISHED") {
+ break
+ } else {
+ Thread.sleep(100)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} where value1=4 order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} where value1=4 order by k1;"
+
+ sql "insert into ${tbName1} values(5,5,5,'B');"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} order by k1;"
+
+ // Do schema change that not do light weight schema change
+ sql """
+ ALTER TABLE ${tbName1}
+ ADD COLUMN k2 CHAR(10) KEY DEFAULT 'A';
+ """
+ max_try_secs = 1200
+ while (max_try_secs--) {
+ String res = getJobState(tbName1)
+ if (res == "FINISHED") {
+ break
+ } else {
+ Thread.sleep(100)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=true) */ * from
${tbName1} where value1=4 order by k1;"
+ qt_sql "select /*+ SET_VAR(enable_vectorized_engine=false) */ * from
${tbName1} where value1=4 order by k1;"
+ sql "DROP TABLE ${tbName1} FORCE;"
+
+}
diff --git
a/regression-test/suites/schema_change_p0/test_alter_table_column_with_delete.groovy
b/regression-test/suites/schema_change_p0/test_alter_table_column_with_delete.groovy
index 59496990a1..9305cf626b 100644
---
a/regression-test/suites/schema_change_p0/test_alter_table_column_with_delete.groovy
+++
b/regression-test/suites/schema_change_p0/test_alter_table_column_with_delete.groovy
@@ -36,7 +36,7 @@ suite("test_alter_table_column_with_delete") {
sql "delete from ${tbName1} where k1 = 2;"
sql "insert into ${tbName1} values(3,3);"
sql "insert into ${tbName1} values(4,4);"
- qt_sql "select * from ${tbName1};"
+ qt_sql "select * from ${tbName1} order by k1;"
sql """
@@ -58,6 +58,6 @@ suite("test_alter_table_column_with_delete") {
}
sql "insert into ${tbName1} values(5,'abc');"
- qt_sql "select * from ${tbName1};"
+ qt_sql "select * from ${tbName1} order by k1;"
sql "DROP TABLE ${tbName1} FORCE;"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]