This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 2e64491ee38 [branch-2.1](insert-overwrite) Support create partition
for auto partition table when insert overwrite (#38628) (#42644)
2e64491ee38 is described below
commit 2e64491ee38f4248a11e9b6bd29cd5b37523b913
Author: zclllhhjj <[email protected]>
AuthorDate: Wed Nov 13 11:16:00 2024 +0800
[branch-2.1](insert-overwrite) Support create partition for auto partition
table when insert overwrite (#38628) (#42644)
pick https://github.com/apache/doris/pull/38628
---
be/src/exec/tablet_info.cpp | 1 +
be/src/pipeline/exec/exchange_sink_operator.cpp | 2 +-
be/src/vec/sink/vrow_distribution.cpp | 177 ++++++++++++++-------
be/src/vec/sink/vrow_distribution.h | 9 +-
be/src/vec/sink/writer/vtablet_writer.cpp | 2 +-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 2 +-
.../apache/doris/analysis/NativeInsertStmt.java | 6 -
.../doris/nereids/parser/LogicalPlanBuilder.java | 4 +
.../insert/InsertOverwriteTableCommand.java | 33 ++--
.../java/org/apache/doris/qe/SessionVariable.java | 20 ++-
.../java/org/apache/doris/qe/StmtExecutor.java | 28 +---
gensrc/thrift/PaloInternalService.thrift | 2 +
.../test_iot_overwrite_and_create.out | 24 +++
.../test_iot_overwrite_and_create_many.out | 15 ++
.../test_iot_overwrite_and_create.groovy | 71 +++++++++
.../test_iot_overwrite_and_create_many.groovy | 64 ++++++++
16 files changed, 356 insertions(+), 104 deletions(-)
diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index 3c934ad0831..53cabb5305f 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -729,6 +729,7 @@ Status VOlapTablePartitionParam::replace_partitions(
// add new partitions with new id.
_partitions.emplace_back(part);
+ VLOG_NOTICE << "params add new partition " << part->id;
// replace items in _partition_maps
if (_is_in_partition) {
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 9958a0b6fc1..b26c69ad560 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -324,7 +324,7 @@ Status ExchangeSinkLocalState::_send_new_partition_batch() {
vectorized::Block tmp_block =
_row_distribution._batching_block->to_block(); // Borrow out,
for lval ref
auto& p = _parent->cast<ExchangeSinkOperatorX>();
- // these order is only.
+ // these order is unique.
// 1. clear batching stats(and flag goes true) so that we won't make
a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real
adjust it. it generate a new block from that.
diff --git a/be/src/vec/sink/vrow_distribution.cpp
b/be/src/vec/sink/vrow_distribution.cpp
index 741148b66f5..025e13edff0 100644
--- a/be/src/vec/sink/vrow_distribution.cpp
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -23,7 +23,7 @@
#include <cstdint>
#include <memory>
-#include <sstream>
+#include <string>
#include "common/status.h"
#include "runtime/client_cache.h"
@@ -110,6 +110,10 @@ Status VRowDistribution::automatic_create_partition() {
if (result.status.status_code == TStatusCode::OK) {
// add new created partitions
RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions));
+ for (const auto& part : result.partitions) {
+ _new_partition_ids.insert(part.id);
+ VLOG_TRACE << "record new id: " << part.id;
+ }
RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
}
@@ -128,7 +132,7 @@ static TCreatePartitionResult
cast_as_create_result(TReplacePartitionResult& arg
// use _partitions and replace them
Status VRowDistribution::_replace_overwriting_partition() {
- SCOPED_TIMER(_add_partition_request_timer);
+ SCOPED_TIMER(_add_partition_request_timer); // also for replace_partition
TReplacePartitionRequest request;
TReplacePartitionResult result;
request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id());
@@ -138,16 +142,20 @@ Status VRowDistribution::_replace_overwriting_partition()
{
// only request for partitions not recorded for replacement
std::set<int64_t> id_deduper;
for (const auto* part : _partitions) {
- if (part == nullptr) [[unlikely]] {
- return Status::InternalError(
- "Cannot found origin partitions in auto detect
overwriting, stop processing");
- }
- if (_new_partition_ids.contains(part->id)) {
- // this is a new partition. dont replace again.
- } else {
- // request for replacement
- id_deduper.insert(part->id);
- }
+ if (part != nullptr) {
+ if (_new_partition_ids.contains(part->id)) {
+ // this is a new partition. dont replace again.
+ VLOG_TRACE << "skip new partition: " << part->id;
+ } else {
+ // request for replacement
+ id_deduper.insert(part->id);
+ }
+ } else if (_missing_map.empty()) {
+ // no origin partition. and not allow to create.
+ return Status::InvalidArgument(
+ "Cannot found origin partitions in auto detect
overwriting, stop "
+ "processing");
+ } // else: part is null and _missing_map is not empty. dealed outside
using auto-partition way. nothing to do here.
}
if (id_deduper.empty()) {
return Status::OK(); // no need to request
@@ -172,6 +180,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
// record new partitions
for (const auto& part : result.partitions) {
_new_partition_ids.insert(part.id);
+ VLOG_TRACE << "record new id: " << part.id;
}
// replace data in _partitions
RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids,
result.partitions));
@@ -294,6 +303,52 @@ Status
VRowDistribution::_generate_rows_distribution_for_non_auto_partition(
return Status::OK();
}
+Status VRowDistribution::_deal_missing_map(vectorized::Block* block,
+ const std::vector<uint16_t>&
partition_cols_idx,
+ int64_t& rows_stat_val) {
+ // for missing partition keys, calc the missing partition and save in
_partitions_need_create
+ auto [part_ctxs, part_exprs] = _get_partition_function();
+ auto part_col_num = part_exprs.size();
+ // the two vectors are in column-first-order
+ std::vector<std::vector<std::string>> col_strs;
+ std::vector<const NullMap*> col_null_maps;
+ col_strs.resize(part_col_num);
+ col_null_maps.reserve(part_col_num);
+
+ for (int i = 0; i < part_col_num; ++i) {
+ auto return_type = part_exprs[i]->data_type();
+ // expose the data column. the return type would be nullable
+ const auto& [range_left_col, col_const] =
+
unpack_if_const(block->get_by_position(partition_cols_idx[i]).column);
+ if (range_left_col->is_nullable()) {
+ col_null_maps.push_back(&(
+ assert_cast<const
ColumnNullable*>(range_left_col.get())->get_null_map_data()));
+ } else {
+ col_null_maps.push_back(nullptr);
+ }
+ for (auto row : _missing_map) {
+ col_strs[i].push_back(
+ return_type->to_string(*range_left_col,
index_check_const(row, col_const)));
+ }
+ }
+
+ // calc the end value and save them. in the end of sending, we will create
partitions for them and deal them.
+ RETURN_IF_ERROR(
+ _save_missing_values(col_strs, part_col_num, block, _missing_map,
col_null_maps));
+
+ size_t new_bt_rows = _batching_block->rows();
+ size_t new_bt_bytes = _batching_block->bytes();
+ rows_stat_val -= new_bt_rows - _batching_rows;
+ _state->update_num_rows_load_total(_batching_rows - new_bt_rows);
+ _state->update_num_bytes_load_total(_batching_bytes - new_bt_bytes);
+ DorisMetrics::instance()->load_rows->increment(_batching_rows -
new_bt_rows);
+ DorisMetrics::instance()->load_bytes->increment(_batching_bytes -
new_bt_bytes);
+ _batching_rows = new_bt_rows;
+ _batching_bytes = new_bt_bytes;
+
+ return Status::OK();
+}
+
Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
vectorized::Block* block, const std::vector<uint16_t>&
partition_cols_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>&
row_part_tablet_ids,
@@ -319,63 +374,64 @@ Status
VRowDistribution::_generate_rows_distribution_for_auto_partition(
RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));
if (!_missing_map.empty()) {
- // for missing partition keys, calc the missing partition and save in
_partitions_need_create
- auto [part_ctxs, part_exprs] = _get_partition_function();
- auto part_col_num = part_exprs.size();
- // the two vectors are in column-first-order
- std::vector<std::vector<std::string>> col_strs;
- std::vector<const NullMap*> col_null_maps;
- col_strs.resize(part_col_num);
- col_null_maps.reserve(part_col_num);
-
- for (int i = 0; i < part_col_num; ++i) {
- auto return_type = part_exprs[i]->data_type();
- // expose the data column. the return type would be nullable
- const auto& [range_left_col, col_const] =
-
unpack_if_const(block->get_by_position(partition_cols_idx[i]).column);
- if (range_left_col->is_nullable()) {
- col_null_maps.push_back(&(assert_cast<const
ColumnNullable*>(range_left_col.get())
- ->get_null_map_data()));
- } else {
- col_null_maps.push_back(nullptr);
- }
- for (auto row : _missing_map) {
- col_strs[i].push_back(
- return_type->to_string(*range_left_col,
index_check_const(row, col_const)));
- }
- }
-
- // calc the end value and save them. in the end of sending, we will
create partitions for them and deal them.
- RETURN_IF_ERROR(
- _save_missing_values(col_strs, part_col_num, block,
_missing_map, col_null_maps));
-
- size_t new_bt_rows = _batching_block->rows();
- size_t new_bt_bytes = _batching_block->bytes();
- rows_stat_val -= new_bt_rows - _batching_rows;
- _state->update_num_rows_load_total(_batching_rows - new_bt_rows);
- _state->update_num_bytes_load_total(_batching_bytes - new_bt_bytes);
- DorisMetrics::instance()->load_rows->increment(_batching_rows -
new_bt_rows);
- DorisMetrics::instance()->load_bytes->increment(_batching_bytes -
new_bt_bytes);
- _batching_rows = new_bt_rows;
- _batching_bytes = new_bt_bytes;
+ RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx,
rows_stat_val));
}
return Status::OK();
}
Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
- vectorized::Block* block, bool has_filtered_rows,
- std::vector<RowPartTabletIds>& row_part_tablet_ids) {
+ vectorized::Block* block, const std::vector<uint16_t>&
partition_cols_idx,
+ bool has_filtered_rows, std::vector<RowPartTabletIds>&
row_part_tablet_ids,
+ int64_t& rows_stat_val) {
auto num_rows = block->rows();
+ // for non-auto-partition situation, goes into two 'else' branch. just
find the origin partitions, replace them by rpc,
+ // and find the new partitions to use.
+ // for auto-partition's, find and save origins in _partitions and replace
them. at meanwhile save the missing values for auto
+ // partition. then we find partition again to get replaced partitions in
_partitions. this time _missing_map is ignored cuz
+ // we already saved missing values.
bool stop_processing = false;
- RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows,
_partitions,
- _tablet_indexes,
stop_processing, _skip));
+ if (_vpartition->is_auto_partition() &&
+ _state->query_options().enable_auto_create_when_overwrite) {
+ // allow auto create partition for missing rows.
+ std::vector<uint16_t> partition_keys =
_vpartition->get_partition_keys();
+ auto partition_col = block->get_by_position(partition_keys[0]);
+ _missing_map.clear();
+ _missing_map.reserve(partition_col.column->size());
+
+ RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows,
_partitions,
+ _tablet_indexes,
stop_processing, _skip,
+ &_missing_map));
+
+ // allow and really need to create during auto-detect-overwriting.
+ if (!_missing_map.empty()) {
+ RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx,
rows_stat_val));
+ }
+ } else {
+ RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows,
_partitions,
+ _tablet_indexes,
stop_processing, _skip));
+ }
RETURN_IF_ERROR(_replace_overwriting_partition());
// regenerate locations for new partitions & tablets
_reset_find_tablets(num_rows);
- RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows,
_partitions,
- _tablet_indexes,
stop_processing, _skip));
+ if (_vpartition->is_auto_partition() &&
+ _state->query_options().enable_auto_create_when_overwrite) {
+ // here _missing_map is just a placeholder
+ RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows,
_partitions,
+ _tablet_indexes,
stop_processing, _skip,
+ &_missing_map));
+ if (VLOG_TRACE_IS_ON) {
+ std::string tmp;
+ for (auto v : _missing_map) {
+ tmp += std::to_string(v).append(", ");
+ }
+ VLOG_TRACE << "Trace missing map of " << this << ':' << tmp;
+ }
+ } else {
+ RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows,
_partitions,
+ _tablet_indexes,
stop_processing, _skip));
+ }
if (has_filtered_rows) {
for (int i = 0; i < num_rows; i++) {
_skip[i] = _skip[i] || _block_convertor->filter_map()[i];
@@ -446,10 +502,11 @@ Status VRowDistribution::generate_rows_distribution(
}
Status st = Status::OK();
- if (_vpartition->is_auto_detect_overwrite()) {
+ if (_vpartition->is_auto_detect_overwrite() && !_deal_batched) {
// when overwrite, no auto create partition allowed.
- st = _generate_rows_distribution_for_auto_overwrite(block.get(),
has_filtered_rows,
-
row_part_tablet_ids);
+ st = _generate_rows_distribution_for_auto_overwrite(block.get(),
partition_cols_idx,
+ has_filtered_rows,
row_part_tablet_ids,
+ rows_stat_val);
} else if (_vpartition->is_auto_partition() && !_deal_batched) {
st = _generate_rows_distribution_for_auto_partition(block.get(),
partition_cols_idx,
has_filtered_rows,
row_part_tablet_ids,
diff --git a/be/src/vec/sink/vrow_distribution.h
b/be/src/vec/sink/vrow_distribution.h
index fffe0e3f7f1..248982c0202 100644
--- a/be/src/vec/sink/vrow_distribution.h
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -162,14 +162,19 @@ private:
vectorized::Block* block, const std::vector<uint16_t>&
partition_col_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>&
row_part_tablet_ids,
int64_t& rows_stat_val);
+ // the whole process to deal missing rows. will call _save_missing_values
+ Status _deal_missing_map(vectorized::Block* block,
+ const std::vector<uint16_t>& partition_cols_idx,
+ int64_t& rows_stat_val);
Status _generate_rows_distribution_for_non_auto_partition(
vectorized::Block* block, bool has_filtered_rows,
std::vector<RowPartTabletIds>& row_part_tablet_ids);
Status _generate_rows_distribution_for_auto_overwrite(
- vectorized::Block* block, bool has_filtered_rows,
- std::vector<RowPartTabletIds>& row_part_tablet_ids);
+ vectorized::Block* block, const std::vector<uint16_t>&
partition_cols_idx,
+ bool has_filtered_rows, std::vector<RowPartTabletIds>&
row_part_tablet_ids,
+ int64_t& rows_stat_val);
Status _replace_overwriting_partition();
void _reset_row_part_tablet_ids(std::vector<RowPartTabletIds>&
row_part_tablet_ids,
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 29b05217529..6745c73d284 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1380,7 +1380,7 @@ Status VTabletWriter::_send_new_partition_batch() {
Block tmp_block = _row_distribution._batching_block->to_block(); //
Borrow out, for lval ref
- // these order is only.
+ // these order is unique.
// 1. clear batching stats(and flag goes true) so that we won't make
a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real
adjust it. it generate a new block from that.
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index c693e20c3a8..dbc85147fe7 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -529,7 +529,7 @@ Status VTabletWriterV2::_send_new_partition_batch() {
Block tmp_block = _row_distribution._batching_block->to_block(); //
Borrow out, for lval ref
- // these order is only.
+ // these order is unique.
// 1. clear batching stats(and flag goes true) so that we won't make
a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real
adjust it. it generate a new block from that.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index a21cfb45553..01d5e6b87d3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -165,7 +165,6 @@ public class NativeInsertStmt extends InsertStmt {
boolean hasEmptyTargetColumns = false;
private boolean allowAutoPartition = true;
- private boolean withAutoDetectOverwrite = false;
enum InsertType {
NATIVE_INSERT("insert_"),
@@ -331,11 +330,6 @@ public class NativeInsertStmt extends InsertStmt {
return isTransactionBegin;
}
- public NativeInsertStmt withAutoDetectOverwrite() {
- this.withAutoDetectOverwrite = true;
- return this;
- }
-
protected void preCheckAnalyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 4114200d2fa..bcd5b641a8a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -593,6 +593,10 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
LogicalPlan plan = visitQuery(ctx.query());
// partitionSpec may be NULL. means auto detect partition. only
available when IOT
Pair<Boolean, List<String>> partitionSpec =
visitPartitionSpec(ctx.partitionSpec());
+ // partitionSpec.second :
+ // null - auto detect
+ // zero - whole table
+ // others - specific partitions
boolean isAutoDetect = partitionSpec.second == null;
LogicalSink<?> sink =
UnboundTableSinkCreator.createUnboundTableSinkMaybeOverwrite(
tableName.build(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
index b0faa6ba508..51f87a25889 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
@@ -151,6 +151,7 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
PhysicalTableSink<?> physicalTableSink = ((PhysicalTableSink<?>)
plan.get());
TableIf targetTable = physicalTableSink.getTargetTable();
List<String> partitionNames;
+ boolean wholeTable = false;
if (physicalTableSink instanceof PhysicalOlapTableSink) {
InternalDatabaseUtil
.checkDatabase(((OlapTable)
targetTable).getQualifiedDbName(), ConnectContext.get());
@@ -165,7 +166,10 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
}
ConnectContext.get().setSkipAuth(true);
partitionNames = ((UnboundTableSink<?>)
logicalQuery).getPartitions();
+ // If not specific partition to overwrite, means it's a command to
overwrite the table.
+ // not we execute as overwrite every partitions.
if (CollectionUtils.isEmpty(partitionNames)) {
+ wholeTable = true;
partitionNames =
Lists.newArrayList(targetTable.getPartitionNames());
}
} else {
@@ -183,9 +187,10 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
// When inserting, BE will call to replace partition by
FrontendService. FE will register new temp
// partitions and return. for transactional, the replacement
will really occur when insert successed,
// i.e. `insertInto` finished. then we call taskGroupSuccess
to make replacement.
- insertInto(ctx, executor, taskId);
+ insertIntoAutoDetect(ctx, executor, taskId);
insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable)
targetTable);
} else {
+ // it's overwrite table(as all partitions) or specific
partition(s)
List<String> tempPartitionNames =
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
if (isCancelled.get()) {
LOG.info("insert overwrite is cancelled before
registerTask, queryId: {}",
@@ -207,7 +212,7 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
insertOverwriteManager.taskFail(taskId);
return;
}
- insertInto(ctx, executor, tempPartitionNames);
+ insertIntoPartitions(ctx, executor, tempPartitionNames,
wholeTable);
if (isCancelled.get()) {
LOG.info("insert overwrite is cancelled before
replacePartition, queryId: {}",
ctx.getQueryIdentifier());
@@ -280,13 +285,15 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
}
/**
- * insert into select. for sepecified temp partitions
+ * insert into select. for sepecified temp partitions or all
partitions(table).
*
- * @param ctx ctx
- * @param executor executor
+ * @param ctx ctx
+ * @param executor executor
* @param tempPartitionNames tempPartitionNames
+ * @param wholeTable overwrite target is the whole table. not one
by one by partitions(...)
*/
- private void insertInto(ConnectContext ctx, StmtExecutor executor,
List<String> tempPartitionNames)
+ private void insertIntoPartitions(ConnectContext ctx, StmtExecutor
executor, List<String> tempPartitionNames,
+ boolean wholeTable)
throws Exception {
// copy sink tot replace by tempPartitions
UnboundLogicalSink<?> copySink;
@@ -302,9 +309,10 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
sink.isPartialUpdate(),
sink.getDMLCommandType(),
(LogicalPlan) (sink.child(0)));
- // 1. for overwrite situation, we disable auto create partition.
+ // 1. when overwrite table, allow auto partition or not is
controlled by session variable.
// 2. we save and pass overwrite auto detect by insertCtx
- insertCtx = new OlapInsertCommandContext(false);
+ boolean allowAutoPartition = wholeTable &&
ctx.getSessionVariable().isEnableAutoCreateWhenOverwrite();
+ insertCtx = new OlapInsertCommandContext(allowAutoPartition);
} else if (logicalQuery instanceof UnboundHiveTableSink) {
UnboundHiveTableSink<?> sink = (UnboundHiveTableSink<?>)
logicalQuery;
copySink = (UnboundLogicalSink<?>)
UnboundTableSinkCreator.createUnboundTableSink(
@@ -343,12 +351,13 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
* @param ctx ctx
* @param executor executor
*/
- private void insertInto(ConnectContext ctx, StmtExecutor executor, long
groupId) throws Exception {
- // 1. for overwrite situation, we disable auto create partition.
- // 2. we save and pass overwrite auto-detected by insertCtx
+ private void insertIntoAutoDetect(ConnectContext ctx, StmtExecutor
executor, long groupId) throws Exception {
InsertCommandContext insertCtx;
if (logicalQuery instanceof UnboundTableSink) {
- insertCtx = new OlapInsertCommandContext(false,
+ // 1. when overwrite auto-detect, allow auto partition or not is
controlled by session variable.
+ // 2. we save and pass overwrite auto detect by insertCtx
+ boolean allowAutoPartition =
ctx.getSessionVariable().isEnableAutoCreateWhenOverwrite();
+ insertCtx = new OlapInsertCommandContext(allowAutoPartition,
((UnboundTableSink<?>)
logicalQuery).isAutoDetectPartition(), groupId);
} else if (logicalQuery instanceof UnboundHiveTableSink) {
insertCtx = new HiveInsertCommandContext();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index aa4d06c93dc..5368112bf1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -646,6 +646,12 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_COOLDOWN_REPLICA_AFFINITY =
"enable_cooldown_replica_affinity";
+ /**
+ * Inserting overwrite for auto partition table allows creating partition
for
+ * datas which cannot find partition to overwrite.
+ */
+ public static final String ENABLE_AUTO_CREATE_WHEN_OVERWRITE =
"enable_auto_create_when_overwrite";
+
/**
* If set false, user couldn't submit analyze SQL and FE won't allocate
any related resources.
*/
@@ -2104,7 +2110,6 @@ public class SessionVariable implements Serializable,
Writable {
})
public boolean enableFallbackOnMissingInvertedIndex = true;
-
@VariableMgr.VarAttr(name = IN_LIST_VALUE_COUNT_THRESHOLD, description = {
"in条件value数量大于这个threshold后将不会走fast_execute",
"When the number of values in the IN condition exceeds this threshold,"
@@ -2135,6 +2140,14 @@ public class SessionVariable implements Serializable,
Writable {
})
public boolean requireSequenceInInsert = true;
+ @VariableMgr.VarAttr(name = ENABLE_AUTO_CREATE_WHEN_OVERWRITE, description
= {
+ "开启后对自动分区表的 insert overwrite 操作会对没有找到分区的插入数据按自动分区规则创建分区,默认关闭",
+ "The insert overwrite operation on an auto-partitioned table will
create partitions for inserted data"
+ + " for which no partition is found according to the
auto-partitioning rules, which is turned off"
+ + " by default."
+ })
+ public boolean enableAutoCreateWhenOverwrite = false;
+
@VariableMgr.VarAttr(name = SKIP_CHECKING_ACID_VERSION_FILE, needForward =
true, description = {
"跳过检查 transactional hive 版本文件 '_orc_acid_version.'",
"Skip checking transactional hive version file
'_orc_acid_version.'"
@@ -3670,6 +3683,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableAdaptivePipelineTaskSerialReadOnLimit(enableAdaptivePipelineTaskSerialReadOnLimit);
tResult.setAdaptivePipelineTaskSerialReadOnLimit(adaptivePipelineTaskSerialReadOnLimit);
tResult.setInListValueCountThreshold(inListValueCountThreshold);
+
tResult.setEnableAutoCreateWhenOverwrite(enableAutoCreateWhenOverwrite);
return tResult;
}
@@ -4291,6 +4305,10 @@ public class SessionVariable implements Serializable,
Writable {
return this.maxMsgSizeOfResultReceiver;
}
+ public boolean isEnableAutoCreateWhenOverwrite() {
+ return this.enableAutoCreateWhenOverwrite;
+ }
+
public TSerdeDialect getSerdeDialect() {
switch (serdeDialect) {
case "doris":
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 831696d9ebb..b39af6efeb1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2932,7 +2932,7 @@ public class StmtExecutor {
}
}
- private void handleIotStmt() {
+ private void handleIotStmt() throws AnalysisException {
ConnectContext.get().setSkipAuth(true);
try {
InsertOverwriteTableStmt iotStmt = (InsertOverwriteTableStmt)
this.parsedStmt;
@@ -2974,9 +2974,11 @@ public class StmtExecutor {
return;
}
// after success create table insert data
+ // when overwrite table, allow auto partition or not is controlled by
session variable.
+ boolean allowAutoPartition =
context.getSessionVariable().isEnableAutoCreateWhenOverwrite();
try {
parsedStmt = new NativeInsertStmt(tmpTableName, null, new
LabelName(iotStmt.getDb(), iotStmt.getLabel()),
- iotStmt.getQueryStmt(), iotStmt.getHints(),
iotStmt.getCols(), true);
+ iotStmt.getQueryStmt(), iotStmt.getHints(),
iotStmt.getCols(), allowAutoPartition);
parsedStmt.setUserInfo(context.getCurrentUserIdentity());
execute();
if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
@@ -3046,6 +3048,7 @@ public class StmtExecutor {
return;
}
// after success add tmp partitions
+ // when overwrite partition, auto creating is always disallowed.
try {
parsedStmt = new NativeInsertStmt(targetTableName, new
PartitionNames(true, tempPartitionName),
new LabelName(iotStmt.getDb(), iotStmt.getLabel()),
iotStmt.getQueryStmt(),
@@ -3088,24 +3091,9 @@ public class StmtExecutor {
}
}
- /*
- * TODO: support insert overwrite auto detect partition in legacy planner
- */
- private void handleAutoOverwritePartition(InsertOverwriteTableStmt
iotStmt) {
- // TODO:
- TableName targetTableName = new TableName(null, iotStmt.getDb(),
iotStmt.getTbl());
- try {
- parsedStmt = new NativeInsertStmt(targetTableName, null, new
LabelName(iotStmt.getDb(), iotStmt.getLabel()),
- iotStmt.getQueryStmt(), iotStmt.getHints(),
iotStmt.getCols(), true).withAutoDetectOverwrite();
- parsedStmt.setUserInfo(context.getCurrentUserIdentity());
- execute();
- } catch (Exception e) {
- LOG.warn("IOT insert data error, stmt={}", parsedStmt.toSql(), e);
- context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
"Unexpected exception: " + e.getMessage());
- handleIotRollback(targetTableName);
- return;
- }
-
+ private void handleAutoOverwritePartition(InsertOverwriteTableStmt
iotStmt) throws AnalysisException {
+ throw new AnalysisException(
+ "insert overwrite auto detect is not support in legacy
planner. use nereids instead");
}
private void handleIotRollback(TableName table) {
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index aada77ba258..064fb3d3de4 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -332,6 +332,8 @@ struct TQueryOptions {
132: optional i32 parallel_prepare_threshold = 0;
133: optional i32 partition_topn_max_partitions = 1024;
134: optional i32 partition_topn_pre_partition_rows = 1000;
+
+ 137: optional bool enable_auto_create_when_overwrite = false;
// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false
}
diff --git
a/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create.out
b/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create.out
new file mode 100644
index 00000000000..594c0cfabde
--- /dev/null
+++ b/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create.out
@@ -0,0 +1,24 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !origin --
+1234567
+Beijing
+Shanghai
+list
+xxx
+
+-- !0 --
+SHANGHAI
+zzz
+
+-- !1 --
+zzz2
+
+-- !2 --
+1234567
+BEIJING
+Shanghai
+abcd
+list
+xxx
+zzz2
+
diff --git
a/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create_many.out
b/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create_many.out
new file mode 100644
index 00000000000..b52a4ecbc1a
--- /dev/null
+++
b/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create_many.out
@@ -0,0 +1,15 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql1 --
+1234567 1
+Beijing 20000
+Shanghai 20000
+list 1
+xxx 1
+zzz 20000
+
+-- !sql2 --
+Beijing 20000
+Shanghai 20000
+yyy 20000
+zzz 20000
+
diff --git
a/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create.groovy
b/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create.groovy
new file mode 100644
index 00000000000..4d0b667dd44
--- /dev/null
+++
b/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create.groovy
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iot_overwrite_and_create") {
+ sql "set enable_auto_create_when_overwrite = true;"
+
+ sql " drop table if exists auto_list; "
+ sql """
+ create table auto_list(
+ k0 varchar null
+ )
+ auto partition by list (k0)
+ (
+ PARTITION p1 values in (("Beijing"), ("BEIJING")),
+ PARTITION p2 values in (("Shanghai"), ("SHANGHAI")),
+ PARTITION p3 values in (("xxx"), ("XXX")),
+ PARTITION p4 values in (("list"), ("LIST")),
+ PARTITION p5 values in (("1234567"), ("7654321"))
+ )
+ DISTRIBUTED BY HASH(`k0`) BUCKETS 1
+ properties("replication_num" = "1");
+ """
+ sql """ insert into auto_list values
("Beijing"),("Shanghai"),("xxx"),("list"),("1234567"); """
+ qt_origin "select * from auto_list order by k0;"
+
+ sql """insert overwrite table auto_list values ("SHANGHAI"), ("zzz");"""
+ qt_0 "select * from auto_list order by k0;"
+ sql """insert overwrite table auto_list values ("zzz2");"""
+ qt_1 "select * from auto_list order by k0;"
+
+ test{
+ sql """insert overwrite table auto_list partition(p1, p2) values
("zzz");"""
+ exception "Insert has filtered data in strict mode."
+ }
+ test{
+ sql """insert overwrite table auto_list partition(p3) values
("zzz3");"""
+ exception "Insert has filtered data in strict mode."
+ }
+
+ sql """ insert into auto_list values
("Beijing"),("Shanghai"),("xxx"),("list"),("1234567"); """
+ sql """insert overwrite table auto_list partition(*) values ("abcd"),
("BEIJING");"""
+ qt_2 "select * from auto_list order by k0;"
+
+ sql "set enable_auto_create_when_overwrite = false;"
+ test{
+ sql """insert overwrite table auto_list values ("zzz3");"""
+ exception "Insert has filtered data in strict mode."
+ }
+ test{
+ sql """insert overwrite table auto_list partition(p1, p2) values
("zzz");"""
+ exception "Insert has filtered data in strict mode."
+ }
+ test{
+ sql """insert overwrite table auto_list partition(*) values
("zzz3");"""
+ exception "Cannot found origin partitions in auto detect overwriting"
+ }
+}
diff --git
a/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create_many.groovy
b/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create_many.groovy
new file mode 100644
index 00000000000..dcade3ce211
--- /dev/null
+++
b/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create_many.groovy
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iot_overwrite_and_create_many") {
+ sql "set enable_auto_create_when_overwrite = true;"
+
+ sql " drop table if exists target; "
+ sql """
+ create table target(
+ k0 varchar null
+ )
+ auto partition by list (k0)
+ (
+ PARTITION p1 values in (("Beijing"), ("BEIJING")),
+ PARTITION p2 values in (("Shanghai"), ("SHANGHAI")),
+ PARTITION p3 values in (("xxx"), ("XXX")),
+ PARTITION p4 values in (("list"), ("LIST")),
+ PARTITION p5 values in (("1234567"), ("7654321"))
+ )
+ DISTRIBUTED BY HASH(`k0`) BUCKETS 2
+ properties("replication_num" = "1");
+ """
+ sql """ insert into target values
("Beijing"),("Shanghai"),("xxx"),("list"),("1234567"); """
+
+ sql " drop table if exists source; "
+ sql """
+ create table source(
+ k0 varchar null
+ )
+ DISTRIBUTED BY HASH(`k0`) BUCKETS 10
+ properties("replication_num" = "1");
+ """
+
+ sql """ insert into source select "Beijing" from numbers("number" =
"20000"); """
+ sql """ insert into source select "Shanghai" from numbers("number" =
"20000"); """
+ sql """ insert into source select "zzz" from numbers("number"= "20000");
"""
+ def result
+ result = sql " show partitions from target; "
+ logger.info("origin: ${result}")
+
+ sql " insert overwrite table target partition(*) select * from source; "
+ result = sql " show partitions from target; "
+ logger.info("changed: ${result}")
+
+ qt_sql1 " select k0, count(k0) from target group by k0 order by k0; "
+
+ sql """ insert into source select "yyy" from numbers("number" = "20000");
"""
+ sql " insert overwrite table target select * from source; "
+ qt_sql2 " select k0, count(k0) from target group by k0 order by k0; "
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]