This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f47e76d2e4f (cloud-merge) Fix stale schema version in delete predicate
(#36083)
f47e76d2e4f is described below
commit f47e76d2e4fa9de8e6dbfdf9278eba43cb7aa15d
Author: Lightman <[email protected]>
AuthorDate: Tue Jun 11 09:49:43 2024 +0800
(cloud-merge) Fix stale schema version in delete predicate (#36083)
1. Fix stale schema version in rowset meta with delete predicate
2. Avoid coredump when accessing a non-existent column name in tablet
schema
---
be/src/cloud/cloud_delete_task.cpp | 7 ++
be/src/olap/base_tablet.cpp | 10 +--
be/src/olap/data_dir.cpp | 3 +-
be/src/olap/delete_handler.cpp | 15 ++--
be/src/olap/delete_handler.h | 2 +-
be/src/olap/rowset/segment_v2/segment_writer.cpp | 5 +-
.../rowset/segment_v2/vertical_segment_writer.cpp | 5 +-
be/src/olap/tablet_reader.cpp | 10 +--
be/src/olap/tablet_schema.cpp | 21 +++---
be/src/olap/tablet_schema.h | 2 +-
be/src/runtime/runtime_predicate.cpp | 8 +--
be/src/runtime/runtime_predicate.h | 10 +--
.../main/java/org/apache/doris/load/DeleteJob.java | 10 +--
.../org/apache/doris/load/loadv2/SparkLoadJob.java | 9 ++-
.../main/java/org/apache/doris/task/PushTask.java | 10 +--
gensrc/thrift/AgentService.thrift | 1 +
regression-test/data/delete_p0/test_delete.out | 3 +
.../suites/delete_p0/test_delete.groovy | 5 ++
.../test_delete_schema_change_2.groovy | 82 ++++++++++++++++++++++
19 files changed, 170 insertions(+), 48 deletions(-)
diff --git a/be/src/cloud/cloud_delete_task.cpp
b/be/src/cloud/cloud_delete_task.cpp
index abf8c63a1da..210e89b838d 100644
--- a/be/src/cloud/cloud_delete_task.cpp
+++ b/be/src/cloud/cloud_delete_task.cpp
@@ -39,6 +39,11 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine,
const TPushReq& requ
auto tablet = DORIS_TRY(engine.tablet_mgr().get_tablet(request.tablet_id));
+ if (!request.__isset.schema_version) {
+ return Status::InternalError("No valid schema version in request,
tablet_id={}",
+ tablet->tablet_id());
+ }
+
using namespace std::chrono;
tablet->last_load_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
@@ -54,7 +59,9 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine,
const TPushReq& requ
// check delete condition if push for delete
DeletePredicatePB del_pred;
auto tablet_schema = std::make_shared<TabletSchema>();
+ // FIXME(plat1ko): Rewrite columns updating logic
tablet_schema->update_tablet_columns(*tablet->tablet_schema(),
request.columns_desc);
+ tablet_schema->set_schema_version(request.schema_version);
RETURN_IF_ERROR(DeleteHandler::generate_delete_predicate(*tablet_schema,
request.delete_conditions, &del_pred));
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 1d9a15709d4..772feb6f450 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -452,8 +452,8 @@ Status BaseTablet::lookup_row_data(const Slice&
encoded_key, const RowLocation&
CHECK(tablet_schema->store_row_column());
SegmentCacheHandle segment_cache_handle;
std::unique_ptr<segment_v2::ColumnIterator> column_iterator;
- RETURN_IF_ERROR(_get_segment_column_iterator(rowset,
row_location.segment_id,
-
tablet_schema->column(BeConsts::ROW_STORE_COL),
+ const auto& column =
*DORIS_TRY(tablet_schema->column(BeConsts::ROW_STORE_COL));
+ RETURN_IF_ERROR(_get_segment_column_iterator(rowset,
row_location.segment_id, column,
&segment_cache_handle,
&column_iterator, &stats));
// get and parse tuple row
vectorized::MutableColumnPtr column_ptr =
vectorized::ColumnString::create();
@@ -871,9 +871,9 @@ Status
BaseTablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset,
SegmentCacheHandle segment_cache_handle;
std::unique_ptr<segment_v2::ColumnIterator> column_iterator;
OlapReaderStatistics stats;
- RETURN_IF_ERROR(_get_segment_column_iterator(rowset, segid,
-
tablet_schema.column(BeConsts::ROW_STORE_COL),
- &segment_cache_handle,
&column_iterator, &stats));
+ const auto& column =
*DORIS_TRY(tablet_schema.column(BeConsts::ROW_STORE_COL));
+ RETURN_IF_ERROR(_get_segment_column_iterator(rowset, segid, column,
&segment_cache_handle,
+ &column_iterator, &stats));
// get and parse tuple row
vectorized::MutableColumnPtr column_ptr =
vectorized::ColumnString::create();
RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(),
rowids.size(), column_ptr));
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 113c5bc9c4b..fed3ccb2b84 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -375,7 +375,8 @@ Status DataDir::load() {
(!delete_pred->in_predicates().empty() &&
delete_pred->in_predicates()[0].has_column_unique_id())) {
// convert pred and write only when delete sub pred v2 is not
set or there is in list pred to be set column uid
- DeleteHandler::convert_to_sub_pred_v2(delete_pred,
rowset_meta->tablet_schema());
+ RETURN_IF_ERROR(DeleteHandler::convert_to_sub_pred_v2(
+ delete_pred, rowset_meta->tablet_schema()));
LOG(INFO) << fmt::format(
"convert rowset with old delete pred: rowset_id={},
tablet_id={}",
rowset_id.to_string(), tablet_uid.to_string());
diff --git a/be/src/olap/delete_handler.cpp b/be/src/olap/delete_handler.cpp
index 6e390874126..73a2e3b1967 100644
--- a/be/src/olap/delete_handler.cpp
+++ b/be/src/olap/delete_handler.cpp
@@ -144,14 +144,15 @@ Status DeleteHandler::generate_delete_predicate(const
TabletSchema& schema,
return Status::OK();
}
-void DeleteHandler::convert_to_sub_pred_v2(DeletePredicatePB* delete_pred,
- TabletSchemaSPtr schema) {
+Status DeleteHandler::convert_to_sub_pred_v2(DeletePredicatePB* delete_pred,
+ TabletSchemaSPtr schema) {
if (!delete_pred->sub_predicates().empty() &&
delete_pred->sub_predicates_v2().empty()) {
for (const auto& condition_str : delete_pred->sub_predicates()) {
auto* sub_pred = delete_pred->add_sub_predicates_v2();
TCondition condition;
static_cast<void>(parse_condition(condition_str, &condition));
-
sub_pred->set_column_unique_id(schema->column(condition.column_name).unique_id());
+ const auto& column =
*DORIS_TRY(schema->column(condition.column_name));
+ sub_pred->set_column_unique_id(column.unique_id());
sub_pred->set_column_name(condition.column_name);
sub_pred->set_op(condition.condition_op);
sub_pred->set_cond_value(condition.condition_values[0]);
@@ -160,8 +161,10 @@ void
DeleteHandler::convert_to_sub_pred_v2(DeletePredicatePB* delete_pred,
auto* in_pred_list = delete_pred->mutable_in_predicates();
for (auto& in_pred : *in_pred_list) {
-
in_pred.set_column_unique_id(schema->column(in_pred.column_name()).unique_id());
+ const auto& column = *DORIS_TRY(schema->column(in_pred.column_name()));
+ in_pred.set_column_unique_id(column.unique_id());
}
+ return Status::OK();
}
bool DeleteHandler::is_condition_value_valid(const TabletColumn& column,
@@ -353,7 +356,9 @@ Status DeleteHandler::_parse_column_pred(TabletSchemaSPtr
complete_schema,
if constexpr (std::is_same_v<SubPredType, DeletePredicatePB>) {
col_unique_id = sub_predicate.col_unique_id;
} else {
- col_unique_id =
delete_pred_related_schema->column(condition.column_name).unique_id();
+ const auto& column =
+
*DORIS_TRY(delete_pred_related_schema->column(condition.column_name));
+ col_unique_id = column.unique_id();
}
condition.__set_column_unique_id(col_unique_id);
const auto& column = complete_schema->column_by_uid(col_unique_id);
diff --git a/be/src/olap/delete_handler.h b/be/src/olap/delete_handler.h
index a2b38cd1548..0910795a81c 100644
--- a/be/src/olap/delete_handler.h
+++ b/be/src/olap/delete_handler.h
@@ -61,7 +61,7 @@ public:
const std::vector<TCondition>&
conditions,
DeletePredicatePB* del_pred);
- static void convert_to_sub_pred_v2(DeletePredicatePB* delete_pred,
TabletSchemaSPtr schema);
+ static Status convert_to_sub_pred_v2(DeletePredicatePB* delete_pred,
TabletSchemaSPtr schema);
/**
* Use regular expression to extract 'column_name', 'op' and 'operands'
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 7e80195477b..78b0a9d2134 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -744,8 +744,9 @@ Status
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
mutable_full_columns[cids_missing[i]].get());
nullable_column->insert_null_elements(1);
} else if (_tablet_schema->auto_increment_column() ==
tablet_column.name()) {
-
DCHECK(_opts.rowset_ctx->tablet_schema->column(tablet_column.name()).type() ==
- FieldType::OLAP_FIELD_TYPE_BIGINT);
+ const auto& column = *DORIS_TRY(
+
_opts.rowset_ctx->tablet_schema->column(tablet_column.name()));
+ DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
auto auto_inc_column =
assert_cast<vectorized::ColumnInt64*>(
mutable_full_columns[cids_missing[i]].get());
auto_inc_column->insert(
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 9f1d2736b88..f1709226d01 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -670,8 +670,9 @@ Status VerticalSegmentWriter::_fill_missing_columns(
mutable_full_columns[missing_cids[i]].get());
nullable_column->insert_null_elements(1);
} else if (_tablet_schema->auto_increment_column() ==
tablet_column.name()) {
-
DCHECK(_opts.rowset_ctx->tablet_schema->column(tablet_column.name()).type() ==
- FieldType::OLAP_FIELD_TYPE_BIGINT);
+ const auto& column = *DORIS_TRY(
+
_opts.rowset_ctx->tablet_schema->column(tablet_column.name()));
+ DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
auto auto_inc_column =
assert_cast<vectorized::ColumnInt64*>(
mutable_full_columns[missing_cids[i]].get());
auto_inc_column->insert(
diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp
index 0c5829bb5a5..e65a10ac73e 100644
--- a/be/src/olap/tablet_reader.cpp
+++ b/be/src/olap/tablet_reader.cpp
@@ -491,10 +491,11 @@ Status TabletReader::_init_conditions_param(const
ReaderParams& read_params) {
RETURN_IF_ERROR(_tablet_schema->have_column(tmp_cond.column_name));
// The "column" parameter might represent a column resulting from the
decomposition of a variant column.
// Instead of using a "unique_id" for identification, we are utilizing
a "path" to denote this column.
- const auto& column =
materialize_column(_tablet_schema->column(tmp_cond.column_name));
+ const auto& column =
*DORIS_TRY(_tablet_schema->column(tmp_cond.column_name));
+ const auto& mcolumn = materialize_column(column);
uint32_t index = _tablet_schema->field_index(tmp_cond.column_name);
ColumnPredicate* predicate =
- parse_to_predicate(column, index, tmp_cond,
_predicate_arena.get());
+ parse_to_predicate(mcolumn, index, tmp_cond,
_predicate_arena.get());
// record condition value into predicate_params in order to pushdown
segment_iterator,
// _gen_predicate_result_sign will build predicate result unique sign
with condition value
auto predicate_params = predicate->predicate_params();
@@ -566,10 +567,11 @@ Status
TabletReader::_init_conditions_param_except_leafnode_of_andnode(
const ReaderParams& read_params) {
for (const auto& condition :
read_params.conditions_except_leafnode_of_andnode) {
TCondition tmp_cond = condition;
- const auto& column =
materialize_column(_tablet_schema->column(tmp_cond.column_name));
+ const auto& column =
*DORIS_TRY(_tablet_schema->column(tmp_cond.column_name));
+ const auto& mcolumn = materialize_column(column);
uint32_t index = _tablet_schema->field_index(tmp_cond.column_name);
ColumnPredicate* predicate =
- parse_to_predicate(column, index, tmp_cond,
_predicate_arena.get());
+ parse_to_predicate(mcolumn, index, tmp_cond,
_predicate_arena.get());
if (predicate != nullptr) {
auto predicate_params = predicate->predicate_params();
predicate_params->marked_by_runtime_filter =
condition.marked_by_runtime_filter;
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 37876440b70..89d154df9e3 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -1127,9 +1127,9 @@ TabletSchemaSPtr
TabletSchema::copy_without_extracted_columns() {
bool TabletSchema::is_dropped_column(const TabletColumn& col) const {
CHECK(_field_id_to_index.find(col.unique_id()) != _field_id_to_index.end())
<< "could not find col with unique id = " << col.unique_id()
- << " and name = " << col.name();
- return _field_name_to_index.find(StringRef(col.name())) ==
_field_name_to_index.end() ||
- column(col.name()).unique_id() != col.unique_id();
+ << " and name = " << col.name() << " table_id=" << _table_id;
+ auto it = _field_name_to_index.find(StringRef {col.name()});
+ return it == _field_name_to_index.end() || _cols[it->second]->unique_id()
!= col.unique_id();
}
void TabletSchema::copy_extracted_columns(const TabletSchema& src_schema) {
@@ -1269,11 +1269,16 @@ Status TabletSchema::have_column(const std::string&
field_name) const {
return Status::OK();
}
-const TabletColumn& TabletSchema::column(const std::string& field_name) const {
- DCHECK(_field_name_to_index.contains(StringRef(field_name)) != 0)
- << ", field_name=" << field_name << ", field_name_to_index=" <<
get_all_field_names();
- const auto& found = _field_name_to_index.find(StringRef(field_name));
- return *_cols[found->second];
+Result<const TabletColumn*> TabletSchema::column(const std::string&
field_name) const {
+ auto it = _field_name_to_index.find(StringRef {field_name});
+ if (it == _field_name_to_index.end()) {
+ DCHECK(false) << "field_name=" << field_name << ", table_id=" <<
_table_id
+ << ", field_name_to_index=" << get_all_field_names();
+ return ResultError(
+ Status::InternalError("column not found, name={}, table_id={},
schema_version={}",
+ field_name, _table_id, _schema_version));
+ }
+ return _cols[it->second].get();
}
std::vector<const TabletIndex*> TabletSchema::get_indexes_for_column(
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index a355f99d23d..4dc5c5e521b 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -311,7 +311,7 @@ public:
int32_t field_index(const vectorized::PathInData& path) const;
int32_t field_index(int32_t col_unique_id) const;
const TabletColumn& column(size_t ordinal) const;
- const TabletColumn& column(const std::string& field_name) const;
+ Result<const TabletColumn*> column(const std::string& field_name) const;
Status have_column(const std::string& field_name) const;
const TabletColumn& column_by_uid(int32_t col_unique_id) const;
TabletColumn& mutable_column_by_uid(int32_t col_unique_id);
diff --git a/be/src/runtime/runtime_predicate.cpp
b/be/src/runtime/runtime_predicate.cpp
index f90a5743fdd..b746ffabb3b 100644
--- a/be/src/runtime/runtime_predicate.cpp
+++ b/be/src/runtime/runtime_predicate.cpp
@@ -217,15 +217,15 @@ Status RuntimePredicate::update(const Field& value) {
if (!updated) {
return Status::OK();
}
-
for (auto p : _contexts) {
auto ctx = p.second;
if (!ctx.tablet_schema) {
continue;
}
- std::unique_ptr<ColumnPredicate> pred {_pred_constructor(
- ctx.tablet_schema->column(ctx.col_name),
ctx.predicate->column_id(),
- _get_value_fn(_orderby_extrem), false, &_predicate_arena)};
+ const auto& column =
*DORIS_TRY(ctx.tablet_schema->column(ctx.col_name));
+ std::unique_ptr<ColumnPredicate> pred {_pred_constructor(column,
ctx.predicate->column_id(),
+
_get_value_fn(_orderby_extrem),
+ false,
&_predicate_arena)};
// For NULLS FIRST, wrap a AcceptNullPredicate to return true for NULL
// since ORDER BY ASC/DESC should get NULL first but pred returns NULL
diff --git a/be/src/runtime/runtime_predicate.h
b/be/src/runtime/runtime_predicate.h
index 73ed657c0bb..eab28fdde42 100644
--- a/be/src/runtime/runtime_predicate.h
+++ b/be/src/runtime/runtime_predicate.h
@@ -66,8 +66,9 @@ public:
}
RETURN_IF_ERROR(tablet_schema->have_column(_contexts[target_node_id].col_name));
_contexts[target_node_id].tablet_schema = tablet_schema;
- _contexts[target_node_id].predicate =
-
SharedPredicate::create_shared(_contexts[target_node_id].get_field_index());
+ int64_t index = DORIS_TRY(_contexts[target_node_id].get_field_index())
+ _contexts[target_node_id]
+ .predicate =
SharedPredicate::create_shared(index);
return Status::OK();
}
@@ -131,8 +132,9 @@ private:
TabletSchemaSPtr tablet_schema;
std::shared_ptr<ColumnPredicate> predicate;
- int32_t get_field_index() {
- return
tablet_schema->field_index(tablet_schema->column(col_name).unique_id());
+ Result<int32_t> get_field_index() {
+ const auto& column = *DORIS_TRY(tablet_schema->column(col_name));
+ return tablet_schema->field_index(column.unique_id());
}
bool target_is_slot() const { return expr.nodes[0].node_type ==
TExprNodeType::SLOT_REF; }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
index 04968076ba9..c766f107aa1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
@@ -299,11 +300,12 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback implements DeleteJ
for (Partition partition : partitions) {
for (MaterializedIndex index :
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
long indexId = index.getId();
- int schemaHash = targetTbl.getSchemaHashByIndexId(indexId);
-
+ MaterializedIndexMeta indexMeta =
targetTbl.getIndexMetaByIndexId(indexId);
+ int schemaVersion = indexMeta.getSchemaVersion();
+ int schemaHash = indexMeta.getSchemaHash();
List<TColumn> columnsDesc = Lists.newArrayList();
// using to update schema of the rowset, so full columns
should be included
- for (Column column : targetTbl.getSchemaByIndexId(indexId,
true)) {
+ for (Column column : indexMeta.getSchema(true)) {
columnsDesc.add(column.toThrift());
}
@@ -350,7 +352,7 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback implements DeleteJ
transactionId,
Env.getCurrentEnv().getNextId() + 10000000000L,
columnsDesc,
- vaultId);
+ vaultId, schemaVersion);
pushTask.setIsSchemaChanging(false);
pushTask.setCountDownLatch(countDownLatch);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index bc539f6b54e..61687e1c4b4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PrimitiveType;
@@ -467,10 +468,12 @@ public class SparkLoadJob extends BulkLoadJob {
List<MaterializedIndex> indexes =
partition.getMaterializedIndices(IndexExtState.ALL);
for (MaterializedIndex index : indexes) {
long indexId = index.getId();
- int schemaHash = indexToSchemaHash.get(indexId);
+ MaterializedIndexMeta indexMeta =
olapTable.getIndexMetaByIndexId(indexId);
+ int schemaVersion = indexMeta.getSchemaVersion();
+ int schemaHash = indexMeta.getSchemaHash();
List<TColumn> columnsDesc = new
ArrayList<TColumn>();
- for (Column column :
olapTable.getSchemaByIndexId(indexId)) {
+ for (Column column : indexMeta.getSchema(true)) {
TColumn tColumn = column.toThrift();
tColumn.setColumnName(tColumn.getColumnName().toLowerCase(Locale.ROOT));
columnsDesc.add(tColumn);
@@ -528,7 +531,7 @@ public class SparkLoadJob extends BulkLoadJob {
partitionId, indexId,
tabletId, replicaId, schemaHash, 0, id,
TPushType.LOAD_V2,
TPriority.NORMAL, transactionId, taskSignature,
tBrokerScanRange,
params.tDescriptorTable, columnsDesc,
- vaultId);
+ vaultId, schemaVersion);
if (AgentTaskQueue.addTask(pushTask)) {
batchTask.addTask(pushTask);
if
(!tabletToSentReplicaPushTask.containsKey(tabletId)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
index 7bcad85e9f4..5cf428e7228 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
@@ -76,6 +76,7 @@ public class PushTask extends AgentTask {
private TDescriptorTable tDescriptorTable;
// for light schema change
+ private int schemaVersion;
private List<TColumn> columnsDesc = null;
private String vaultId;
@@ -84,7 +85,7 @@ public class PushTask extends AgentTask {
long tabletId, long replicaId, int schemaHash, long version,
String filePath, long fileSize,
int timeoutSecond, long loadJobId, TPushType pushType,
List<Predicate> conditions, boolean needDecompress,
TPriority priority, TTaskType taskType, long transactionId, long
signature, List<TColumn> columnsDesc,
- String vaultId) {
+ String vaultId, int schemaVersion) {
super(resourceInfo, backendId, taskType, dbId, tableId, partitionId,
indexId, tabletId, signature);
this.replicaId = replicaId;
this.schemaHash = schemaHash;
@@ -105,16 +106,17 @@ public class PushTask extends AgentTask {
this.tDescriptorTable = null;
this.columnsDesc = columnsDesc;
this.vaultId = vaultId;
+ this.schemaVersion = schemaVersion;
}
// for load v2 (SparkLoadJob)
public PushTask(long backendId, long dbId, long tableId, long partitionId,
long indexId, long tabletId,
long replicaId, int schemaHash, int timeoutSecond, long loadJobId,
TPushType pushType, TPriority priority,
long transactionId, long signature, TBrokerScanRange
tBrokerScanRange, TDescriptorTable tDescriptorTable,
- List<TColumn> columnsDesc, String vaultId) {
+ List<TColumn> columnsDesc, String vaultId, int schemaVersion) {
this(null, backendId, dbId, tableId, partitionId, indexId, tabletId,
replicaId, schemaHash, -1, null, 0,
timeoutSecond, loadJobId, pushType, null, false, priority,
TTaskType.REALTIME_PUSH, transactionId,
- signature, columnsDesc, vaultId);
+ signature, columnsDesc, vaultId, schemaVersion);
this.tBrokerScanRange = tBrokerScanRange;
this.tDescriptorTable = tDescriptorTable;
}
@@ -197,7 +199,7 @@ public class PushTask extends AgentTask {
}
request.setColumnsDesc(columnsDesc);
request.setStorageVaultId(this.vaultId);
-
+ request.setSchemaVersion(schemaVersion);
return request;
}
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index 587715d43cd..104adca70fa 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -273,6 +273,7 @@ struct TPushReq {
15: optional Descriptors.TDescriptorTable desc_tbl
16: optional list<Descriptors.TColumn> columns_desc
17: optional string storage_vault_id
+ 18: optional i32 schema_version
}
struct TCloneReq {
diff --git a/regression-test/data/delete_p0/test_delete.out
b/regression-test/data/delete_p0/test_delete.out
index 08104b003c8..becddb7d97b 100644
--- a/regression-test/data/delete_p0/test_delete.out
+++ b/regression-test/data/delete_p0/test_delete.out
@@ -14,6 +14,9 @@
-- !sql --
8
+-- !sql --
+8
+
-- !sql1 --
abcdef 2022-08-12 2022-08-16T12:11:11 2022-08-16T12:11:11.111
2022-08-12 2022-08-16T12:11:11 2022-08-16T12:11:11.111
diff --git a/regression-test/suites/delete_p0/test_delete.groovy
b/regression-test/suites/delete_p0/test_delete.groovy
index c0a5e0fbfe0..62553571357 100644
--- a/regression-test/suites/delete_p0/test_delete.groovy
+++ b/regression-test/suites/delete_p0/test_delete.groovy
@@ -32,6 +32,11 @@ suite("test_delete") {
qt_sql """select count(c1) from ${tableName};"""
qt_sql """select count(c1) from ${tableName} where c1 = 'abcdef';"""
+ // test delete after light schema change
+ sql """ALTER TABLE ${tableName} ADD COLUMN c4 int;"""
+ sql """delete from ${tableName} where `c4` = 1;"""
+ qt_sql """select count(*) from ${tableName};"""
+
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """ CREATE TABLE IF NOT EXISTS delete_regression_test (k1 varchar(190)
NOT NULL COMMENT "", k2 DATEV2 NOT NULL COMMENT "", k3 DATETIMEV2 NOT NULL
COMMENT "", k4 DATETIMEV2(3) NOT NULL COMMENT "", v1 DATEV2 NOT NULL COMMENT
"", v2 DATETIMEV2 NOT NULL COMMENT "", v3 DATETIMEV2(3) NOT NULL COMMENT "" )
ENGINE=OLAP DUPLICATE KEY(k1, k2, k3, k4) COMMENT "OLAP" DISTRIBUTED BY
HASH(k1, k2, k3, k4) BUCKETS 3
diff --git
a/regression-test/suites/schema_change_p0/test_delete_schema_change_2.groovy
b/regression-test/suites/schema_change_p0/test_delete_schema_change_2.groovy
new file mode 100644
index 00000000000..88a909127de
--- /dev/null
+++ b/regression-test/suites/schema_change_p0/test_delete_schema_change_2.groovy
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite('test_delete_schema_chang_2') {
+ sql 'DROP TABLE IF EXISTS schema_change_delete_regression_test_2;'
+
+ sql '''
+ CREATE TABLE IF NOT EXISTS schema_change_delete_regression_test_2 (
+ `user_id` LARGEINT NOT NULL COMMENT "用户id",
+ `date` DATE NOT NULL COMMENT "数据灌入日期时间",
+ `city` VARCHAR(20) COMMENT "用户所在城市",
+ `age` SMALLINT COMMENT "用户年龄",
+ `sex` TINYINT COMMENT "用户性别",
+ `last_visit_date` DATETIME DEFAULT "1970-01-01 00:00:00"
COMMENT "用户最后一次访问时间",
+ `last_update_date` DATETIME DEFAULT "1970-01-01 00:00:00"
COMMENT "用户最后一次更新时间",
+ `last_visit_date_not_null` DATETIME NOT NULL DEFAULT
"1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
+ `cost` BIGINT DEFAULT "0" COMMENT "用户总消费",
+ `max_dwell_time` INT DEFAULT "0" COMMENT "用户最大停留时间",
+ `min_dwell_time` INT DEFAULT "99999" COMMENT "用户最小停留时间")
+ DUPLICATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`)
+ BUCKETS 8
+ PROPERTIES ( "replication_num" = "1", "light_schema_change" =
"false" );
+ '''
+
+ sql """
+ INSERT INTO schema_change_delete_regression_test_2 VALUES
+ (1, '2017-10-01', 'Beijing', 10, 1, '2020-01-01',
'2020-01-01', '2020-01-01', 1, 30, 20);
+ """
+
+ sql """
+ INSERT INTO schema_change_delete_regression_test_2 VALUES
+ (2, '2017-10-01', 'Beijing', 10, 1, '2020-01-02',
'2020-01-02', '2020-01-02', 1, 31, 21);
+ """
+
+ sql 'SELECT * FROM schema_change_delete_regression_test_2 order by user_id
ASC, last_visit_date;'
+
+ if (!isCloudMode()) {
+ sql "ALTER TABLE schema_change_delete_regression_test_2 SET
('light_schema_change' = 'true');"
+ }
+
+ sql "ALTER table schema_change_delete_regression_test_2 ADD COLUMN
new_column INT default '1';"
+
+ sql 'DELETE FROM schema_change_delete_regression_test_2 where new_column =
1;'
+
+ sql 'SELECT * FROM schema_change_delete_regression_test_2 order by user_id
DESC, last_visit_date;'
+
+ sql """
+ INSERT INTO schema_change_delete_regression_test_2 VALUES
+ (1, '2017-10-01', 'Beijing', 10, 1, '2020-01-02',
'2020-01-02', '2020-01-02', 1, 31, 19, 2);
+ """
+
+ sql """
+ INSERT INTO schema_change_delete_regression_test_2 VALUES
+ (2, '2017-10-01', 'Beijing', 10, 1, '2020-01-03',
'2020-01-03', '2020-01-03', 1, 32, 20, 2);
+ """
+
+ sql """
+ INSERT INTO schema_change_delete_regression_test_2 VALUES
+ (3, '2017-10-01', 'Beijing', 10, 1, '2020-01-03',
'2020-01-03', '2020-01-03', 1, 32, 20, 1);
+ """
+
+ sql """
+ INSERT INTO schema_change_delete_regression_test_2 VALUES
+ (3, '2017-10-01', 'Beijing', 10, 1, '2020-01-03',
'2020-01-03', '2020-01-03', 1, 32, 20, 2);
+ """
+
+ sql 'SELECT * FROM schema_change_delete_regression_test_2 order by user_id
DESC, last_visit_date;'
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]