This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new f88c44aaaeb branch-4.0: [feature](file-cache) Implement cache warm up
select functionality. (#58315)
f88c44aaaeb is described below
commit f88c44aaaeb8ba193f4efe810940c68b97c54a84
Author: Qi Chen <[email protected]>
AuthorDate: Tue Nov 25 16:11:43 2025 +0800
branch-4.0: [feature](file-cache) Implement cache warm up select
functionality. (#58315)
### What problem does this PR solve?
Problem Summary:
### Release note
Cherry pick #54822 #58312
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/pipeline/exec/blackhole_sink_operator.cpp | 112 ++++++++++
be/src/pipeline/exec/blackhole_sink_operator.h | 90 ++++++++
be/src/pipeline/exec/operator.cpp | 2 +
be/src/pipeline/pipeline_fragment_context.cpp | 11 +
be/src/runtime/runtime_query_statistics_mgr.cpp | 20 +-
be/src/runtime/workload_management/io_context.h | 10 +
.../workload_management/resource_context.cpp | 1 +
be/src/vec/exec/scan/file_scanner.cpp | 2 +
be/src/vec/exec/scan/olap_scanner.cpp | 2 +
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 6 +
.../org/apache/doris/analysis/SchemaTableType.java | 4 +-
.../java/org/apache/doris/catalog/SchemaTable.java | 7 +
.../doris/common/profile/ProfileManager.java | 3 +-
.../java/org/apache/doris/common/util/Util.java | 17 +-
.../nereids/analyzer/UnboundBlackholeSink.java | 158 ++++++++++++++
.../glue/translator/PhysicalPlanTranslator.java | 11 +
.../doris/nereids/parser/LogicalPlanBuilder.java | 68 ++++++
.../nereids/properties/RequestPropertyDeriver.java | 8 +
.../org/apache/doris/nereids/rules/RuleSet.java | 2 +
.../org/apache/doris/nereids/rules/RuleType.java | 2 +
.../doris/nereids/rules/analysis/BindSink.java | 25 ++-
...ogicalBlackholeSinkToPhysicalBlackholeSink.java | 43 ++++
.../apache/doris/nereids/trees/plans/PlanType.java | 5 +
.../commands/insert/AbstractInsertExecutor.java | 35 ++++
.../commands/insert/BlackholeInsertExecutor.java | 91 ++++++++
.../commands/insert/InsertIntoTableCommand.java | 42 +++-
.../trees/plans/commands/insert/InsertUtils.java | 3 +
.../plans/commands/insert/WarmupSelectCommand.java | 228 +++++++++++++++++++++
.../trees/plans/logical/LogicalBlackholeSink.java | 88 ++++++++
.../plans/physical/PhysicalBlackholeSink.java | 144 +++++++++++++
.../nereids/trees/plans/visitor/SinkVisitor.java | 17 ++
.../org/apache/doris/planner/BlackholeSink.java | 61 ++++++
.../WorkloadRuntimeStatusMgr.java | 54 +++--
.../doris/nereids/parser/NereidsParserTest.java | 27 +++
gensrc/proto/data.proto | 1 +
gensrc/thrift/DataSinks.thrift | 5 +
gensrc/thrift/Descriptors.thrift | 1 +
gensrc/thrift/FrontendService.thrift | 9 +-
.../warm_up_select/test_warmup_select.groovy | 113 ++++++++++
.../cache/test_hive_warmup_select.groovy | 156 ++++++++++++++
40 files changed, 1646 insertions(+), 38 deletions(-)
diff --git a/be/src/pipeline/exec/blackhole_sink_operator.cpp
b/be/src/pipeline/exec/blackhole_sink_operator.cpp
new file mode 100644
index 00000000000..ca23b8b1fbe
--- /dev/null
+++ b/be/src/pipeline/exec/blackhole_sink_operator.cpp
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "blackhole_sink_operator.h"
+
+#include <fmt/format.h>
+#include <gen_cpp/PaloInternalService_types.h>
+
+#include <sstream>
+
+#include "common/logging.h"
+#include "common/status.h"
+#include "pipeline/dependency.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+
+namespace doris {
+namespace pipeline {
+
+BlackholeSinkOperatorX::BlackholeSinkOperatorX(int operator_id) :
Base(operator_id, 0, 0) {}
+
+Status BlackholeSinkOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(DataSinkOperatorXBase::prepare(state));
+ return Status::OK();
+}
+
+Status BlackholeSinkOperatorX::init(const TDataSink& tsink) {
+ RETURN_IF_ERROR(DataSinkOperatorXBase::init(tsink));
+ return Status::OK();
+}
+
+Status BlackholeSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
block, bool eos) {
+ auto& local_state = get_local_state(state);
+ SCOPED_TIMER(local_state.exec_time_counter());
+ COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
+
+ if (block && block->rows() > 0) {
+ RETURN_IF_ERROR(_process_block(state, block));
+ }
+
+ return Status::OK();
+}
+
+Status BlackholeSinkOperatorX::_process_block(RuntimeState* state,
vectorized::Block* block) {
+ auto& local_state = get_local_state(state);
+
+ // Update metrics - count rows and bytes processed
+ local_state._rows_processed += block->rows();
+ local_state._bytes_processed += block->bytes();
+
+ // Update runtime counters
+ if (local_state._rows_processed_timer) {
+ COUNTER_UPDATE(local_state._rows_processed_timer, block->rows());
+ }
+ if (local_state._bytes_processed_timer) {
+ COUNTER_UPDATE(local_state._bytes_processed_timer, block->bytes());
+ }
+
+ // The BLACKHOLE discards the data
+ // We don't write the block anywhere - it's effectively sent to /dev/null
+ return Status::OK();
+}
+
+Status BlackholeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo&
info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_init_timer);
+
+ // Initialize performance counters
+ _rows_processed_timer = ADD_COUNTER(custom_profile(), "RowsProcessed",
TUnit::UNIT);
+ _bytes_processed_timer = ADD_COUNTER(custom_profile(), "BytesProcessed",
TUnit::BYTES);
+
+ return Status::OK();
+}
+
+Status BlackholeSinkLocalState::open(RuntimeState* state) {
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_open_timer);
+ RETURN_IF_ERROR(Base::open(state));
+
+ return Status::OK();
+}
+
+Status BlackholeSinkLocalState::close(RuntimeState* state, Status exec_status)
{
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_close_timer);
+
+ return Base::close(state, exec_status);
+}
+
+Status BlackholeSinkOperatorX::close(RuntimeState* state) {
+ return Status::OK();
+}
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/blackhole_sink_operator.h
b/be/src/pipeline/exec/blackhole_sink_operator.h
new file mode 100644
index 00000000000..1acf6074d3e
--- /dev/null
+++ b/be/src/pipeline/exec/blackhole_sink_operator.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/Types_types.h>
+#include <stdint.h>
+
+#include <memory>
+
+#include "operator.h"
+#include "vec/core/block.h"
+
+namespace doris {
+
+class TDataSink;
+
+namespace vectorized {
+class Block;
+}
+
+namespace pipeline {
+
+// Forward declaration
+class BlackholeSinkOperatorX;
+
+class BlackholeSinkLocalState final : public
PipelineXSinkLocalState<FakeSharedState> {
+ ENABLE_FACTORY_CREATOR(BlackholeSinkLocalState);
+
+public:
+ using Parent = BlackholeSinkOperatorX;
+ using Base = PipelineXSinkLocalState<FakeSharedState>;
+ BlackholeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+ : Base(parent, state) {}
+
+ Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+ Status open(RuntimeState* state) override;
+ Status close(RuntimeState* state, Status exec_status) override;
+
+ int64_t _rows_processed = 0;
+ int64_t _bytes_processed = 0;
+
+ RuntimeProfile::Counter* _rows_processed_timer = nullptr;
+ RuntimeProfile::Counter* _bytes_processed_timer = nullptr;
+
+private:
+ friend class BlackholeSinkOperatorX;
+};
+
+class BlackholeSinkOperatorX final : public
DataSinkOperatorX<BlackholeSinkLocalState> {
+public:
+ using Base = DataSinkOperatorX<BlackholeSinkLocalState>;
+
+ BlackholeSinkOperatorX(int operator_id);
+
+ Status prepare(RuntimeState* state) override;
+
+ Status init(const TDataSink& tsink) override;
+
+ Status sink(RuntimeState* state, vectorized::Block* block, bool eos)
override;
+
+ Status close(RuntimeState* state) override;
+
+private:
+ friend class BlackholeSinkLocalState;
+
+ /**
+ * Process a data block by discarding it and collecting metrics.
+ * This simulates a "/dev/null" sink - data goes in but nothing comes out.
+ */
+ Status _process_block(RuntimeState* state, vectorized::Block* block);
+};
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 6f0eb562f0f..2b2123849ea 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -24,6 +24,7 @@
#include "pipeline/exec/analytic_sink_operator.h"
#include "pipeline/exec/analytic_source_operator.h"
#include "pipeline/exec/assert_num_rows_operator.h"
+#include "pipeline/exec/blackhole_sink_operator.h"
#include "pipeline/exec/cache_sink_operator.h"
#include "pipeline/exec/cache_source_operator.h"
#include "pipeline/exec/datagen_operator.h"
@@ -783,6 +784,7 @@ DECLARE_OPERATOR(OlapTableSinkV2LocalState)
DECLARE_OPERATOR(HiveTableSinkLocalState)
DECLARE_OPERATOR(IcebergTableSinkLocalState)
DECLARE_OPERATOR(AnalyticSinkLocalState)
+DECLARE_OPERATOR(BlackholeSinkLocalState)
DECLARE_OPERATOR(SortSinkLocalState)
DECLARE_OPERATOR(SpillSortSinkLocalState)
DECLARE_OPERATOR(LocalExchangeSinkLocalState)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index bd0ce44d78d..38704bf10a0 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -46,6 +46,7 @@
#include "pipeline/exec/analytic_sink_operator.h"
#include "pipeline/exec/analytic_source_operator.h"
#include "pipeline/exec/assert_num_rows_operator.h"
+#include "pipeline/exec/blackhole_sink_operator.h"
#include "pipeline/exec/cache_sink_operator.h"
#include "pipeline/exec/cache_source_operator.h"
#include "pipeline/exec/datagen_operator.h"
@@ -104,6 +105,8 @@
#include "pipeline_task.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
+#include "runtime/result_block_buffer.h"
+#include "runtime/result_buffer_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/thread_context.h"
@@ -1163,6 +1166,14 @@ Status
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
}
break;
}
+ case TDataSinkType::BLACKHOLE_SINK: {
+ if (!thrift_sink.__isset.blackhole_sink) {
+ return Status::InternalError("Missing blackhole sink.");
+ }
+
+ _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
+ break;
+ }
default:
return Status::InternalError("Unsuported sink type in pipeline: {}",
thrift_sink.type);
}
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index e1f97b8cdc0..b08f8705b6c 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -338,7 +338,8 @@ void RuntimeQueryStatisticsMgr::register_resource_context(
void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id;
// 1 get query statistics map
- std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>>
fe_qs_map;
+ // <fe_addr, <query_id, <query_statistics, is_query_finished>>>
+ std::map<TNetworkAddress, std::map<std::string,
std::pair<TQueryStatistics, bool>>> fe_qs_map;
std::map<std::string, std::pair<bool, bool>> qs_status; // <finished,
timeout>
{
std::lock_guard<std::shared_mutex>
write_lock(_resource_contexts_map_lock);
@@ -364,13 +365,14 @@ void
RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
if (resource_ctx->task_controller()->query_type() !=
TQueryType::EXTERNAL) {
if
(fe_qs_map.find(resource_ctx->task_controller()->fe_addr()) ==
fe_qs_map.end()) {
- std::map<std::string, TQueryStatistics> tmp_map;
+ std::map<std::string, std::pair<TQueryStatistics,
bool>> tmp_map;
fe_qs_map[resource_ctx->task_controller()->fe_addr()]
= std::move(tmp_map);
}
TQueryStatistics ret_t_qs;
resource_ctx->to_thrift_query_statistics(&ret_t_qs);
-
fe_qs_map.at(resource_ctx->task_controller()->fe_addr())[query_id] = ret_t_qs;
+
fe_qs_map.at(resource_ctx->task_controller()->fe_addr())[query_id] =
+ std::make_pair(ret_t_qs, is_query_finished);
qs_status[query_id] =
std::make_pair(is_query_finished,
is_timeout_after_finish);
}
@@ -407,7 +409,17 @@ void
RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
// 2.2 send report
TReportWorkloadRuntimeStatusParams report_runtime_params;
report_runtime_params.__set_backend_id(be_id);
- report_runtime_params.__set_query_statistics_map(qs_map);
+
+ // Build the query statistics map with TQueryStatisticsResult
+ std::map<std::string, TQueryStatisticsResult> query_stats_result_map;
+ for (const auto& [query_id, query_stats_pair] : qs_map) {
+ TQueryStatisticsResult stats_result;
+ stats_result.__set_statistics(query_stats_pair.first); //
TQueryStatistics
+ stats_result.__set_query_finished(query_stats_pair.second); //
is_query_finished
+ query_stats_result_map[query_id] = stats_result;
+ }
+
+
report_runtime_params.__set_query_statistics_result_map(query_stats_result_map);
TReportExecStatusParams params;
params.__set_report_workload_runtime_status(report_runtime_params);
diff --git a/be/src/runtime/workload_management/io_context.h
b/be/src/runtime/workload_management/io_context.h
index f66e8ac6b6d..5c4cf33a3a2 100644
--- a/be/src/runtime/workload_management/io_context.h
+++ b/be/src/runtime/workload_management/io_context.h
@@ -40,6 +40,8 @@ public:
RuntimeProfile::Counter* scan_bytes_counter_;
RuntimeProfile::Counter* scan_bytes_from_local_storage_counter_;
RuntimeProfile::Counter* scan_bytes_from_remote_storage_counter_;
+ RuntimeProfile::Counter* bytes_write_into_cache_counter_;
+
// number rows returned by query.
// only set once by result sink when closing.
RuntimeProfile::Counter* returned_rows_counter_;
@@ -58,6 +60,8 @@ public:
ADD_COUNTER(profile_, "ScanBytesFromLocalStorage",
TUnit::BYTES);
scan_bytes_from_remote_storage_counter_ =
ADD_COUNTER(profile_, "ScanBytesFromRemoteStorage",
TUnit::BYTES);
+ bytes_write_into_cache_counter_ =
+ ADD_COUNTER(profile_, "BytesWriteIntoCache", TUnit::BYTES);
returned_rows_counter_ = ADD_COUNTER(profile_, "ReturnedRows",
TUnit::UNIT);
shuffle_send_bytes_counter_ = ADD_COUNTER(profile_,
"ShuffleSendBytes", TUnit::BYTES);
shuffle_send_rows_counter_ =
@@ -86,6 +90,9 @@ public:
int64_t scan_bytes_from_remote_storage() const {
return stats_.scan_bytes_from_remote_storage_counter_->value();
}
+ int64_t bytes_write_into_cache() const {
+ return stats_.bytes_write_into_cache_counter_->value();
+ }
int64_t returned_rows() const { return
stats_.returned_rows_counter_->value(); }
int64_t shuffle_send_bytes() const { return
stats_.shuffle_send_bytes_counter_->value(); }
int64_t shuffle_send_rows() const { return
stats_.shuffle_send_rows_counter_->value(); }
@@ -106,6 +113,9 @@ public:
void update_scan_bytes_from_remote_storage(int64_t delta) const {
stats_.scan_bytes_from_remote_storage_counter_->update(delta);
}
+ void update_bytes_write_into_cache(int64_t delta) const {
+ stats_.bytes_write_into_cache_counter_->update(delta);
+ }
void update_returned_rows(int64_t delta) const {
stats_.returned_rows_counter_->update(delta); }
void update_shuffle_send_bytes(int64_t delta) const {
stats_.shuffle_send_bytes_counter_->update(delta);
diff --git a/be/src/runtime/workload_management/resource_context.cpp
b/be/src/runtime/workload_management/resource_context.cpp
index 4cab0afb449..c1b0fd2b744 100644
--- a/be/src/runtime/workload_management/resource_context.cpp
+++ b/be/src/runtime/workload_management/resource_context.cpp
@@ -38,6 +38,7 @@ void
ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c
statistics->__set_scan_bytes_from_remote_storage(
io_context()->scan_bytes_from_remote_storage());
statistics->__set_scan_bytes_from_local_storage(io_context()->scan_bytes_from_local_storage());
+
statistics->__set_bytes_write_into_cache(io_context()->bytes_write_into_cache());
if (workload_group() != nullptr) {
statistics->__set_workload_group_id(workload_group()->id());
diff --git a/be/src/vec/exec/scan/file_scanner.cpp
b/be/src/vec/exec/scan/file_scanner.cpp
index 2a34972f05f..2e1731aa36e 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -1810,6 +1810,8 @@ void FileScanner::_collect_profile_before_close() {
_profile != nullptr) {
io::FileCacheProfileReporter cache_profile(_profile);
cache_profile.update(_file_cache_statistics.get());
+
_state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
+ _file_cache_statistics->bytes_write_into_cache);
}
if (_cur_reader != nullptr) {
diff --git a/be/src/vec/exec/scan/olap_scanner.cpp
b/be/src/vec/exec/scan/olap_scanner.cpp
index c0c0c35f0be..e60bf769f2a 100644
--- a/be/src/vec/exec/scan/olap_scanner.cpp
+++ b/be/src/vec/exec/scan/olap_scanner.cpp
@@ -777,6 +777,8 @@ void OlapScanner::_collect_profile_before_close() {
if (config::is_cloud_mode() && config::enable_file_cache) {
io::FileCacheProfileReporter
cache_profile(local_state->_segment_profile.get());
cache_profile.update(&stats.file_cache_stats);
+
_state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
+ stats.file_cache_stats.bytes_write_into_cache);
}
COUNTER_UPDATE(local_state->_output_index_result_column_timer,
stats.output_index_result_column_timer);
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index ef036f97134..9ca29f37594 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -503,6 +503,8 @@ supportedOtherStatement
((CLUSTER | COMPUTE GROUP) source=identifier |
(warmUpItem (AND warmUpItem)*)) FORCE?
properties=propertyClause?
#warmUpCluster
+ | explain? WARM UP SELECT namedExpressionSeq
+ FROM warmUpSingleTableRef whereClause?
#warmUpSelect
| BACKUP SNAPSHOT label=multipartIdentifier TO repo=identifier
((ON | EXCLUDE) LEFT_PAREN baseTableRef (COMMA baseTableRef)*
RIGHT_PAREN)?
properties=propertyClause?
#backup
@@ -513,6 +515,10 @@ supportedOtherStatement
: TABLE tableName=multipartIdentifier (PARTITION partitionName=identifier)?
;
+warmUpSingleTableRef
+ : multipartIdentifier tableAlias?
+ ;
+
lockTable
: name=multipartIdentifier (AS alias=identifierOrText)?
(READ (LOCAL)? | (LOW_PRIORITY)? WRITE)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
index 85f16ab6d15..aafcae4a913 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
@@ -104,7 +104,9 @@ public enum SchemaTableType {
TSchemaTableType.SCH_ENCRYPTION_KEYS),
SCH_CLUSTER_SNAPSHOTS("CLUSTER_SNAPSHOTS", "CLUSTER_SNAPSHOTS",
TSchemaTableType.SCH_CLUSTER_SNAPSHOTS),
SCH_CLUSTER_SNAPSHOT_PROPERTIES("CLUSTER_SNAPSHOT_PROPERTIES",
"CLUSTER_SNAPSHOT_PROPERTIES",
- TSchemaTableType.SCH_CLUSTER_SNAPSHOT_PROPERTIES);
+ TSchemaTableType.SCH_CLUSTER_SNAPSHOT_PROPERTIES),
+ SCH_BLACKHOLE("BLACKHOLE", "BLACKHOLE",
+ TSchemaTableType.SCH_BLACKHOLE);
private static final String dbName = "INFORMATION_SCHEMA";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 37b3fdd8d26..deb8aac86a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -51,6 +51,8 @@ public class SchemaTable extends Table {
private static final int PRIVILEGE_TYPE_LEN = 64;
private static final int IS_GRANTABLE_LEN = 3;
+ public static final String BLACKHOLE_TABLE_NAME = "blackhole";
+
// Now we just mock tables, table_privileges, referential_constraints,
key_column_usage and routines table
// Because in MySQL ODBC, these tables are used.
// TODO(zhaochun): Review some commercial BI to check if we need support
where clause in show statement
@@ -689,6 +691,11 @@ public class SchemaTable extends Table {
.column("REF_TYPE",
ScalarType.createVarchar(NAME_CHAR_LEN))
.build())
)
+ .put(BLACKHOLE_TABLE_NAME,
+ new SchemaTable(SystemIdGenerator.getNextId(),
BLACKHOLE_TABLE_NAME, TableType.SCHEMA,
+ builder().column("VERSION",
ScalarType.createType(PrimitiveType.INT))
+ .build())
+ )
.put("encryption_keys",
new SchemaTable(SystemIdGenerator.getNextId(),
"encryption_keys", TableType.SCHEMA,
builder().column("ID", ScalarType.createStringType())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
index 81bdbbbf005..b9fcc581cf0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
@@ -396,7 +396,8 @@ public class ProfileManager extends MasterDaemon {
} else {
LOG.warn("Failed to get real-time query stats, id {}, resp
is {}",
queryId, resp == null ? "null" : resp.toString());
- throw new Exception("Failed to get realtime query stats: "
+ resp.toString());
+ throw new Exception("Failed to get realtime query stats: "
+ + (resp == null ? "null" : resp.toString()));
}
} catch (Exception e) {
LOG.warn("Failed to get real-time query stats, id {}, error:
{}", queryId, e.getMessage(), e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index dcde71a7a6a..4ac9341af96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -654,16 +654,19 @@ public class Util {
public static String getRootCauseMessage(Throwable t) {
String rootCause = "unknown";
+ if (t == null) {
+ return rootCause;
+ }
Throwable p = t;
- while (p != null) {
- String message = p.getMessage();
- if (message == null) {
- rootCause = p.getClass().getName();
- } else {
- rootCause = p.getClass().getName() + ": " + p.getMessage();
- }
+ while (p.getCause() != null) {
p = p.getCause();
}
+ String message = p.getMessage();
+ if (message == null) {
+ rootCause = p.getClass().getName();
+ } else {
+ rootCause = p.getClass().getName() + ": " + message;
+ }
return rootCause;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundBlackholeSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundBlackholeSink.java
new file mode 100644
index 00000000000..4a3b76820e5
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundBlackholeSink.java
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.analyzer;
+
+import org.apache.doris.catalog.InfoSchemaDb;
+import org.apache.doris.catalog.SchemaTable;
+import org.apache.doris.nereids.exceptions.UnboundException;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.BlockFuncDepsPropagation;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
+import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
+import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Unbound black hole sink.
+ * The blackhole sink is currently used in "warm up select" SQL statements to
preload file block caches into tables.
+ * It is planned as a terminal sink (like /dev/null),
+ * meaning a "black hole" at the end of the execution plan that discards all
incoming data.
+ */
+public class UnboundBlackholeSink<CHILD_TYPE extends Plan> extends
UnboundLogicalSink<CHILD_TYPE>
+ implements Unbound, Sink, BlockFuncDepsPropagation {
+
+ /**
+ * UnboundBlackholeSink Context
+ */
+ public static class UnboundBlackholeSinkContext {
+ private boolean isForWarmUp = false;
+
+ public UnboundBlackholeSinkContext(boolean isForWarmUp) {
+ this.isForWarmUp = isForWarmUp;
+ }
+
+ public boolean isForWarmUp() {
+ return isForWarmUp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UnboundBlackholeSinkContext that = (UnboundBlackholeSinkContext) o;
+ return isForWarmUp == that.isForWarmUp;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(isForWarmUp);
+ }
+ }
+
+ private UnboundBlackholeSinkContext context;
+
+ /**
+ * create unbound sink for blackhole sink
+ */
+ public UnboundBlackholeSink(CHILD_TYPE child, UnboundBlackholeSinkContext
context) {
+ super(ImmutableList.of(InfoSchemaDb.DATABASE_NAME,
SchemaTable.BLACKHOLE_TABLE_NAME),
+ PlanType.LOGICAL_UNBOUND_BLACKHOLE_SINK,
+ ImmutableList.of(),
+ Optional.empty(),
+ Optional.empty(),
+ ImmutableList.of(),
+ DMLCommandType.INSERT,
+ child);
+ this.context = context;
+ }
+
+ /**
+ * create unbound sink for blackhole sink
+ */
+ public UnboundBlackholeSink(Optional<GroupExpression> groupExpression,
+ Optional<LogicalProperties> logicalProperties, CHILD_TYPE child,
UnboundBlackholeSinkContext context) {
+ super(ImmutableList.of(InfoSchemaDb.DATABASE_NAME,
SchemaTable.BLACKHOLE_TABLE_NAME),
+ PlanType.LOGICAL_UNBOUND_BLACKHOLE_SINK,
+ ImmutableList.of(),
+ groupExpression,
+ logicalProperties,
+ ImmutableList.of(),
+ DMLCommandType.INSERT,
+ child);
+ this.context = context;
+ }
+
+ public UnboundBlackholeSinkContext getContext() {
+ return context;
+ }
+
+ @Override
+ public Plan withChildren(List<Plan> children) {
+ Preconditions.checkArgument(children.size() == 1,
"UnboundBlackholeSink only accepts one child");
+ return new UnboundBlackholeSink<>(groupExpression, Optional.empty(),
children.get(0), context);
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitUnboundBlackholeSink(this, context);
+ }
+
+ @Override
+ public Plan withGroupExpression(Optional<GroupExpression> groupExpression)
{
+ return new UnboundBlackholeSink<>(groupExpression,
Optional.of(getLogicalProperties()), child(), context);
+ }
+
+ @Override
+ public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression>
groupExpression,
+ Optional<LogicalProperties> logicalProperties, List<Plan>
children) {
+ Preconditions.checkArgument(children.size() == 1,
"UnboundBlackholeSink only accepts one child");
+ return new UnboundBlackholeSink<>(groupExpression, logicalProperties,
children.get(0), context);
+ }
+
+ @Override
+ public UnboundBlackholeSink<CHILD_TYPE>
withOutputExprs(List<NamedExpression> outputExprs) {
+ throw new UnboundException("could not call withOutputExprs on
UnboundBlackholeSink");
+ }
+
+ @Override
+ public List<Slot> computeOutput() {
+ throw new UnboundException("output");
+ }
+
+ @Override
+ public String toString() {
+ return Utils.toSqlString("UnboundBlackholeSink[" + id.asInt() + "]");
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index a6a2d6cc08c..8a8a342ccf0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -121,6 +121,7 @@ import
org.apache.doris.nereids.trees.plans.algebra.Relation;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalBlackholeSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
@@ -182,6 +183,7 @@ import org.apache.doris.planner.AggregationNode;
import org.apache.doris.planner.AnalyticEvalNode;
import org.apache.doris.planner.AssertNumRowsNode;
import org.apache.doris.planner.BackendPartitionedSchemaScanNode;
+import org.apache.doris.planner.BlackholeSink;
import org.apache.doris.planner.CTEScanNode;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataStreamSink;
@@ -443,6 +445,15 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
* sink Node, in lexicographical order
*
********************************************************************************************
*/
+ @Override
+ public PlanFragment visitPhysicalBlackholeSink(PhysicalBlackholeSink<?
extends Plan> physicalBlackholeSink,
+ PlanTranslatorContext context) {
+ PlanFragment planFragment = physicalBlackholeSink.child().accept(this,
context);
+ BlackholeSink blackholeSink = new
BlackholeSink(planFragment.getPlanRoot().getId());
+ planFragment.setSink(blackholeSink);
+ return planFragment;
+ }
+
@Override
public PlanFragment visitPhysicalResultSink(PhysicalResultSink<? extends
Plan> physicalResultSink,
PlanTranslatorContext context) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index c1e26372cd0..98caca0ce37 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -477,6 +477,8 @@ import
org.apache.doris.nereids.DorisParser.WithRemoteStorageSystemContext;
import org.apache.doris.nereids.DorisParserBaseVisitor;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundBlackholeSink;
+import
org.apache.doris.nereids.analyzer.UnboundBlackholeSink.UnboundBlackholeSinkContext;
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.analyzer.UnboundInlineTable;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
@@ -987,6 +989,7 @@ import
org.apache.doris.nereids.trees.plans.commands.info.WarmUpItem;
import
org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
+import
org.apache.doris.nereids.trees.plans.commands.insert.WarmupSelectCommand;
import
org.apache.doris.nereids.trees.plans.commands.load.CreateRoutineLoadCommand;
import org.apache.doris.nereids.trees.plans.commands.load.LoadColumnClause;
import org.apache.doris.nereids.trees.plans.commands.load.LoadColumnDesc;
@@ -8736,6 +8739,71 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
return new WarmUpClusterCommand(warmUpItems, srcCluster, dstCluster,
isForce, isWarmUpWithTable, properties);
}
+ @Override
+ public LogicalPlan visitWarmUpSelect(DorisParser.WarmUpSelectContext ctx) {
+ LogicalPlan relation =
visitWarmUpSingleTableRef(ctx.warmUpSingleTableRef());
+
+ LogicalPlan filter = withFilter(relation,
Optional.ofNullable(ctx.whereClause()));
+
+ List<Expression> projectList =
visitNamedExpressionSeq(ctx.namedExpressionSeq());
+
+ for (Expression expr : projectList) {
+ if (!isSimpleColumnReference(expr)) {
+ throw new AnalysisException("WARM UP SELECT only supports
simple column references, "
+ + "aggregate functions and complex expressions are not
allowed");
+ }
+ }
+
+ LogicalProject project = new LogicalProject(projectList, filter);
+
+ if (Config.isNotCloudMode() &&
(!ConnectContext.get().getSessionVariable().isEnableFileCache())) {
+ throw new AnalysisException("WARM UP SELECT requires session
variable"
+ + " enable_file_cache=true");
+ }
+
+ if (Config.isCloudMode() &&
ConnectContext.get().getSessionVariable().isDisableFileCache()) {
+ throw new AnalysisException("WARM UP SELECT requires session
variable"
+ + " disable_file_cache=false in cloud mode");
+ }
+
+ UnboundBlackholeSink<?> sink = new UnboundBlackholeSink<>(project,
+ new UnboundBlackholeSinkContext(true));
+ LogicalPlan command = new WarmupSelectCommand(sink);
+ return withExplain(command, ctx.explain());
+ }
+
+ @Override
+ public LogicalPlan
visitWarmUpSingleTableRef(DorisParser.WarmUpSingleTableRefContext ctx) {
+ List<String> nameParts =
visitMultipartIdentifier(ctx.multipartIdentifier());
+
+ // Create a simple UnboundRelation for warm up queries
+ UnboundRelation relation = new UnboundRelation(
+ StatementScopeIdGenerator.newRelationId(),
+ nameParts);
+
+ LogicalPlan checkedRelation =
LogicalPlanBuilderAssistant.withCheckPolicy(relation);
+ LogicalPlan plan = withTableAlias(checkedRelation, ctx.tableAlias());
+ return plan;
+ }
+
+ /**
+ * Check if an expression is a simple column reference (not aggregate
functions or complex expressions)
+ */
+ private boolean isSimpleColumnReference(Expression expr) {
+ // Allow simple column references
+ if (expr instanceof Slot) {
+ return true;
+ }
+
+ // Allow star expressions (*)
+ if (expr instanceof UnboundStar) {
+ return true;
+ }
+
+ // Reject everything else (including function calls, arithmetic
expressions, etc.)
+ return false;
+ }
+
@Override
public LogicalPlan visitCreateResource(DorisParser.CreateResourceContext
ctx) {
String resourceName = visitIdentifierOrText(ctx.name);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
index 05ed2760307..2322b39a62b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
@@ -38,6 +38,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalBlackholeSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
import
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDictionarySink;
@@ -131,6 +132,13 @@ public class RequestPropertyDeriver extends
PlanVisitor<Void, PlanContext> {
* sink Node, in lexicographical order
*
********************************************************************************************
*/
+ @Override
+ public Void visitPhysicalBlackholeSink(PhysicalBlackholeSink<? extends
Plan> sink, PlanContext context) {
+ // Blackhole sink need parallel instance
+ addRequestPropertyToChildren(PhysicalProperties.ANY);
+ return null;
+ }
+
@Override
public Void visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends
Plan> olapTableSink, PlanContext context) {
if (connectContext != null &&
!connectContext.getSessionVariable().enableStrictConsistencyDml) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
index f361348ccad..15d67d2e696 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
@@ -53,6 +53,7 @@ import
org.apache.doris.nereids.rules.exploration.mv.MaterializedViewWindowScanR
import
org.apache.doris.nereids.rules.expression.ExpressionNormalizationAndOptimization;
import org.apache.doris.nereids.rules.implementation.AggregateStrategies;
import
org.apache.doris.nereids.rules.implementation.LogicalAssertNumRowsToPhysicalAssertNumRows;
+import
org.apache.doris.nereids.rules.implementation.LogicalBlackholeSinkToPhysicalBlackholeSink;
import
org.apache.doris.nereids.rules.implementation.LogicalCTEAnchorToPhysicalCTEAnchor;
import
org.apache.doris.nereids.rules.implementation.LogicalCTEConsumerToPhysicalCTEConsumer;
import
org.apache.doris.nereids.rules.implementation.LogicalCTEProducerToPhysicalCTEProducer;
@@ -223,6 +224,7 @@ public class RuleSet {
.add(new LogicalResultSinkToPhysicalResultSink())
.add(new
LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink())
.add(new LogicalDictionarySinkToPhysicalDictionarySink())
+ .add(new LogicalBlackholeSinkToPhysicalBlackholeSink())
.build();
// left-zig-zag tree is used when column stats are not available.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 6e275ec8263..87f9da44f3b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -35,6 +35,7 @@ public enum RuleType {
// binding rules
BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
+ BINDING_INSERT_BLACKHOLE_SINK(RuleTypeClass.REWRITE),
BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_JDBC_TABLE(RuleTypeClass.REWRITE),
@@ -518,6 +519,7 @@ public enum RuleType {
LOGICAL_JDBC_SCAN_TO_PHYSICAL_JDBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ODBC_SCAN_TO_PHYSICAL_ODBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
+
LOGICAL_BLACKHOLE_SINK_TO_PHYSICAL_BLACKHOLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
index bb75f846d51..34462634f44 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Config;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
@@ -40,6 +41,7 @@ import org.apache.doris.dictionary.Dictionary;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.Scope;
+import org.apache.doris.nereids.analyzer.UnboundBlackholeSink;
import org.apache.doris.nereids.analyzer.UnboundDictionarySink;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
@@ -68,11 +70,13 @@ import
org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import
org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalBlackholeSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalDictionarySink;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
@@ -142,8 +146,9 @@ public class BindSink implements AnalysisRuleFactory {
unboundIcebergTableSink().thenApply(this::bindIcebergTableSink)),
RuleType.BINDING_INSERT_JDBC_TABLE.build(unboundJdbcTableSink().thenApply(this::bindJdbcTableSink)),
RuleType.BINDING_INSERT_DICTIONARY_TABLE
-
.build(unboundDictionarySink().thenApply(this::bindDictionarySink))
- );
+
.build(unboundDictionarySink().thenApply(this::bindDictionarySink)),
+
RuleType.BINDING_INSERT_BLACKHOLE_SINK.build(unboundBlackholeSink().thenApply(this::bindBlackHoleSink))
+ );
}
private Plan bindOlapTableSink(MatchingContext<UnboundTableSink<Plan>>
ctx) {
@@ -508,6 +513,22 @@ public class BindSink implements AnalysisRuleFactory {
return columnToOutput;
}
+ private Plan bindBlackHoleSink(MatchingContext<UnboundBlackholeSink<Plan>>
ctx) {
+ UnboundBlackholeSink<?> sink = ctx.root;
+ LogicalPlan child = ((LogicalPlan) sink.child());
+ if (sink.getContext().isForWarmUp() && Config.isNotCloudMode() &&
child.containsType(LogicalOlapScan.class)) {
+ throw new AnalysisException("WARM UP SELECT doesn't support olap
table in non-cloud mode.");
+ }
+ LogicalBlackholeSink<?> boundSink = new LogicalBlackholeSink<>(
+ child.getOutput().stream()
+ .map(NamedExpression.class::cast)
+ .collect(ImmutableList.toImmutableList()),
+ Optional.empty(),
+ Optional.empty(),
+ child);
+ return boundSink;
+ }
+
private Plan bindHiveTableSink(MatchingContext<UnboundHiveTableSink<Plan>>
ctx) {
UnboundHiveTableSink<?> sink = ctx.root;
Pair<HMSExternalDatabase, HMSExternalTable> pair =
bind(ctx.cascadesContext, sink);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalBlackholeSinkToPhysicalBlackholeSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalBlackholeSinkToPhysicalBlackholeSink.java
new file mode 100644
index 00000000000..389e914a696
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalBlackholeSinkToPhysicalBlackholeSink.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalBlackholeSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalBlackholeSink;
+
+import java.util.Optional;
+
+/**
+ * implement blackhole sink.
+ */
+public class LogicalBlackholeSinkToPhysicalBlackholeSink extends
OneImplementationRuleFactory {
+ @Override
+ public Rule build() {
+ return logicalBlackholeSink().thenApply(ctx -> {
+ LogicalBlackholeSink<? extends Plan> sink = ctx.root;
+ return new PhysicalBlackholeSink<>(
+ sink.getOutputExprs(),
+ Optional.empty(),
+ sink.getLogicalProperties(),
+ sink.child());
+
}).toRule(RuleType.LOGICAL_BLACKHOLE_SINK_TO_PHYSICAL_BLACKHOLE_SINK_RULE);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index f2e88c1df7d..181b7e4083a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -52,6 +52,7 @@ public enum PlanType {
LOGICAL_ICEBERG_TABLE_SINK,
LOGICAL_JDBC_TABLE_SINK,
LOGICAL_RESULT_SINK,
+ LOGICAL_BLACKHOLE_SINK,
LOGICAL_DICTIONARY_SINK,
LOGICAL_UNBOUND_OLAP_TABLE_SINK,
LOGICAL_UNBOUND_HIVE_TABLE_SINK,
@@ -59,6 +60,7 @@ public enum PlanType {
LOGICAL_UNBOUND_JDBC_TABLE_SINK,
LOGICAL_UNBOUND_RESULT_SINK,
LOGICAL_UNBOUND_DICTIONARY_SINK,
+ LOGICAL_UNBOUND_BLACKHOLE_SINK,
// logical others
LOGICAL_AGGREGATE,
@@ -113,6 +115,7 @@ public enum PlanType {
PHYSICAL_ICEBERG_TABLE_SINK,
PHYSICAL_JDBC_TABLE_SINK,
PHYSICAL_RESULT_SINK,
+ PHYSICAL_BLACKHOLE_SINK,
PHYSICAL_DICTIONARY_SINK,
// physical others
@@ -153,6 +156,8 @@ public enum PlanType {
EXPORT_COMMAND,
INSERT_INTO_TABLE_COMMAND,
INSERT_INTO_DICTIONARY_COMMAND,
+ INSERT_INTO_BLACKHOLE_COMMAND,
+ WARMUP_SELECT_COMMAND,
BATCH_INSERT_INTO_TABLE_COMMAND,
INSERT_OVERWRITE_TABLE_COMMAND,
LOAD_COMMAND,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index f69a868182d..f1afbd135ec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -44,7 +44,9 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
/**
* Abstract insert executor.
@@ -70,6 +72,25 @@ public abstract class AbstractInsertExecutor {
protected final boolean emptyInsert;
protected long txnId = INVALID_TXN_ID;
+ /**
+ * Insert executor listener
+ */
+ public interface InsertExecutorListener {
+ /**
+ * Called before insert execution begins
+ */
+
+ default void beforeComplete(AbstractInsertExecutor insertExecutor,
StmtExecutor executor, long jobId)
+ throws Exception {
+ }
+
+ default void afterComplete(AbstractInsertExecutor insertExecutor,
StmtExecutor executor, long jobId)
+ throws Exception {
+ }
+ }
+
+ private List<InsertExecutorListener> listeners = new
CopyOnWriteArrayList<>();
+
/**
* constructor
*/
@@ -91,6 +112,14 @@ public abstract class AbstractInsertExecutor {
this.jobId = jobId;
}
+ public void registerListener(InsertExecutorListener listener) {
+ listeners.add(listener);
+ }
+
+ public void unregisterListener(InsertExecutorListener listener) {
+ listeners.remove(listener);
+ }
+
public Coordinator getCoordinator() {
return coordinator;
}
@@ -208,7 +237,13 @@ public abstract class AbstractInsertExecutor {
executor.updateProfile(false);
execImpl(executor);
checkStrictModeAndFilterRatio();
+ for (InsertExecutorListener listener : listeners) {
+ listener.beforeComplete(this, executor, jobId);
+ }
onComplete();
+ for (InsertExecutorListener listener : listeners) {
+ listener.afterComplete(this, executor, jobId);
+ }
} catch (Throwable t) {
onFail(t);
// retry insert into from select when meet E-230 in cloud
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BlackholeInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BlackholeInsertExecutor.java
new file mode 100644
index 00000000000..b2ef9c19efa
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BlackholeInsertExecutor.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
+import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.StmtExecutor;
+
+import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * Insert executor for blackhole table
+ */
+public class BlackholeInsertExecutor extends AbstractInsertExecutor {
+ private static final Logger LOG =
LogManager.getLogger(BlackholeInsertExecutor.class);
+
+ /**
+ * constructor
+ */
+ public BlackholeInsertExecutor(ConnectContext ctx, TableIf table, String
labelName, NereidsPlanner planner,
+ Optional<InsertCommandContext> insertCtx, boolean emptyInsert,
long jobId) {
+ super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
+ }
+
+ @Override
+ public void beginTransaction() {
+ }
+
+ @Override
+ protected void beforeExec() throws UserException {
+ // do nothing
+ }
+
+ @Override
+ protected void finalizeSink(PlanFragment fragment, DataSink sink,
PhysicalSink physicalSink) {
+ // do nothing
+ }
+
+ @Override
+ protected void onComplete() {
+ if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) {
+ LOG.warn("blackhole insert failed. label: {}, error: {}",
+ labelName, coordinator.getExecStatus().getErrorMsg());
+ }
+ }
+
+ @Override
+ protected void onFail(Throwable t) {
+ errMsg = t.getMessage() == null ? "unknown reason" :
Util.getRootCauseMessage(t);
+ String queryId = DebugUtil.printId(ctx.queryId());
+ LOG.warn("insert [{}] with query id {} abort txn {} failed",
labelName, queryId, txnId);
+ StringBuilder sb = new StringBuilder(errMsg);
+ if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
+ sb.append(". url: ").append(coordinator.getTrackingUrl());
+ }
+ ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString());
+ }
+
+ @Override
+ protected void afterExec(StmtExecutor executor) {
+
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index f2d9d28153f..3ac25f371ae 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -55,8 +55,10 @@ import org.apache.doris.nereids.trees.plans.commands.Command;
import
org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.commands.NeedAuditEncryption;
+import
org.apache.doris.nereids.trees.plans.commands.insert.AbstractInsertExecutor.InsertExecutorListener;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalBlackholeSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDictionarySink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
@@ -117,18 +119,20 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
private final Optional<LogicalPlan> cte;
private final boolean needNormalizePlan;
+ private InsertExecutorListener insertExecutorListener;
+
public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String>
labelName,
Optional<InsertCommandContext> insertCtx, Optional<LogicalPlan>
cte) {
- this(logicalQuery, labelName, insertCtx, cte, true, Optional.empty());
+ this(PlanType.INSERT_INTO_TABLE_COMMAND, logicalQuery, labelName,
insertCtx, cte, true, Optional.empty());
}
/**
* constructor
*/
- public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String>
labelName,
+ public InsertIntoTableCommand(PlanType planType, LogicalPlan logicalQuery,
Optional<String> labelName,
Optional<InsertCommandContext> insertCtx,
Optional<LogicalPlan> cte,
boolean needNormalizePlan, Optional<String>
branchName) {
- super(PlanType.INSERT_INTO_TABLE_COMMAND);
+ super(planType);
this.originLogicalQuery = Objects.requireNonNull(logicalQuery,
"logicalQuery should not be null");
this.labelName = Objects.requireNonNull(labelName, "labelName should
not be null");
this.logicalQuery = Optional.empty();
@@ -143,6 +147,13 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
}
}
+ public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String>
labelName,
+ Optional<InsertCommandContext> insertCtx, Optional<LogicalPlan>
cte,
+ boolean needNormalizePlan, Optional<String> branchName) {
+ this(PlanType.INSERT_INTO_TABLE_COMMAND, logicalQuery, labelName,
insertCtx, cte,
+ needNormalizePlan, branchName);
+ }
+
/**
* constructor for derived class
*/
@@ -190,6 +201,10 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
this.jobId = jobId;
}
+ public void setInsertExecutorListener(InsertExecutorListener
insertExecutorListener) {
+ this.insertExecutorListener = insertExecutorListener;
+ }
+
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws
Exception {
runInternal(ctx, executor);
@@ -229,7 +244,7 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
while (++retryTimes <
Math.max(ctx.getSessionVariable().dmlPlanRetryTimes, 3)) {
TableIf targetTableIf = getTargetTableIf(ctx,
qualifiedTargetTableName);
// check auth
- if (!Env.getCurrentEnv().getAccessManager()
+ if (needAuthCheck(targetTableIf) &&
!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(),
targetTableIf.getDatabase().getCatalog().getName(),
targetTableIf.getDatabase().getFullName(),
targetTableIf.getName(),
PrivPredicate.LOAD)) {
@@ -299,6 +314,16 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
throw new AnalysisException("Insert plan failed. Could not get target
table lock.");
}
+ /**
+ * Hook method to determine if auth check is needed.
+ * Subclasses can override this to skip auth check, e.g.,
WarmupSelectCommand.
+ * @param targetTableIf the target table
+ * @return true if auth check is needed, false otherwise
+ */
+ protected boolean needAuthCheck(TableIf targetTableIf) {
+ return true;
+ }
+
private BuildInsertExecutorResult initPlanOnce(ConnectContext ctx,
StmtExecutor stmtExecutor, TableIf targetTableIf) throws Throwable
{
targetTableIf.readLock();
@@ -462,6 +487,12 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
return ExecutorFactory.from(planner, dataSink, physicalSink,
() -> new DictionaryInsertExecutor(
ctx, dictionary, label, planner, insertCtx,
emptyInsert, jobId));
+ } else if (physicalSink instanceof PhysicalBlackholeSink) {
+ boolean emptyInsert = childIsEmptyRelation(physicalSink);
+ // insertCtx is not useful for blackhole. so keep it empty is
ok.
+ return ExecutorFactory.from(planner, dataSink, physicalSink,
+ () -> new BlackholeInsertExecutor(ctx, targetTableIf,
label, planner, insertCtx, emptyInsert,
+ jobId));
} else {
// TODO: support other table types
throw new AnalysisException(
@@ -540,6 +571,9 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
if (insertExecutor.isEmptyInsert()) {
return;
}
+ if (insertExecutorListener != null) {
+ insertExecutor.registerListener(insertExecutorListener);
+ }
insertExecutor.executeSingleInsert(executor);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index 077e57daaa9..1cfef22a10e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -33,6 +33,7 @@ import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.analyzer.Scope;
import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundBlackholeSink;
import org.apache.doris.nereids.analyzer.UnboundDictionarySink;
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
@@ -578,6 +579,8 @@ public class InsertUtils {
unboundTableSink = (UnboundJdbcTableSink<? extends Plan>) plan;
} else if (plan instanceof UnboundDictionarySink) {
unboundTableSink = (UnboundDictionarySink<? extends Plan>) plan;
+ } else if (plan instanceof UnboundBlackholeSink) {
+ unboundTableSink = (UnboundBlackholeSink<? extends Plan>) plan;
} else {
throw new AnalysisException(
"the root of plan only accept Olap, Dictionary, Hive,
Iceberg or Jdbc table sink, but it is "
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/WarmupSelectCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/WarmupSelectCommand.java
new file mode 100644
index 00000000000..e997f362b0e
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/WarmupSelectCommand.java
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import
org.apache.doris.nereids.trees.plans.commands.insert.AbstractInsertExecutor.InsertExecutorListener;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.qe.CoordInterface;
+import org.apache.doris.qe.QeProcessorImpl;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.ShowResultSet;
+import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TQueryStatistics;
+import org.apache.doris.thrift.TQueryStatisticsResult;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * File cache warm up select command, such as "warm up select * from table
where ..."
+ */
+public class WarmupSelectCommand extends InsertIntoTableCommand {
+
+ private static final long QUERY_STATISTICS_TIMEOUT_MS = 5000; // 5 seconds
+ private static final long QUERY_STATISTICS_INTERVAL_MS = 1000; // 1 second
+
+ // Create a result set with aggregated data per BE
+ private static final ShowResultSetMetaData metaData =
ShowResultSetMetaData.builder()
+ .addColumn(new Column("BackendId", ScalarType.createVarchar(20)))
+ .addColumn(new Column("ScanRows",
ScalarType.createType(PrimitiveType.BIGINT)))
+ .addColumn(new Column("ScanBytes",
ScalarType.createType(PrimitiveType.BIGINT)))
+ .addColumn(new Column("ScanBytesFromLocalStorage",
ScalarType.createType(PrimitiveType.BIGINT)))
+ .addColumn(new Column("ScanBytesFromRemoteStorage",
ScalarType.createType(PrimitiveType.BIGINT)))
+ .addColumn(new Column("BytesWriteIntoCache",
ScalarType.createType(PrimitiveType.BIGINT)))
+ .build();
+
+ /**
+ * WarmupSelectCommand constructor
+ */
+ public WarmupSelectCommand(LogicalPlan logicalQuery) {
+ super(PlanType.INSERT_INTO_BLACKHOLE_COMMAND, logicalQuery,
Optional.empty(), Optional.empty(),
+ Optional.empty(), true, Optional.empty());
+ super.setInsertExecutorListener(new InsertExecutorListener() {
+ @Override
+ public void beforeComplete(AbstractInsertExecutor insertExecutor,
StmtExecutor executor, long jobId)
+ throws Exception {
+ // no-ops
+ }
+
+ @Override
+ public void afterComplete(AbstractInsertExecutor insertExecutor,
StmtExecutor executor, long jobId)
+ throws Exception {
+ if (insertExecutor.ctx.getState().getStateType() ==
QueryState.MysqlStateType.ERR) {
+ return;
+ }
+ Map<Long, TQueryStatisticsResult> statistics =
pollForQueryStatistics(insertExecutor.ctx.queryId(),
+ QUERY_STATISTICS_TIMEOUT_MS,
QUERY_STATISTICS_INTERVAL_MS);
+ if (statistics != null) {
+ sendAggregatedBlackholeResults(statistics, executor);
+ }
+ }
+ });
+ }
+
+ /**
+ * Poll for query statistics until the query is finished or timeout is
reached.
+ * Wait for the latest stats from each BE until all BEs have reported
isFinished or overall timeout.
+ *
+ * @param queryId The query ID to poll for
+ * @param timeoutMs Maximum time to wait in milliseconds
+ * @param intervalMs Time between polls in milliseconds
+ * @return Map of backend ID to query statistics result
+ */
+ private Map<Long, TQueryStatisticsResult> pollForQueryStatistics(TUniqueId
queryId,
+ long
timeoutMs,
+ long
intervalMs) {
+ long startTime = System.currentTimeMillis();
+ Map<Long, TQueryStatisticsResult> beIdToStatisticsResult;
+
+ // Get the coordinator to know how many backends are involved
+ CoordInterface coor = QeProcessorImpl.INSTANCE.getCoordinator(queryId);
+ int expectedBackendCount = 0;
+ if (coor == null) {
+ return null;
+ }
+
+ expectedBackendCount = coor.getInvolvedBackends().size();
+
+ // Track which backends have finished and store the latest statistics
for each backend
+ Set<Long> finishedBackends = Sets.newHashSet();
+ Map<Long, TQueryStatisticsResult> latestStatisticsResult =
Maps.newHashMap();
+
+ // Continue polling until all backends are finished or timeout
+ while (System.currentTimeMillis() - startTime < timeoutMs &&
finishedBackends.size() < expectedBackendCount) {
+ // Get current statistics
+ beIdToStatisticsResult =
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr()
+ .getQueryStatistics(DebugUtil.printId(queryId));
+
+ // Update the latest statistics for each backend and check which
ones have finished
+ if (beIdToStatisticsResult != null &&
!beIdToStatisticsResult.isEmpty()) {
+ for (Map.Entry<Long, TQueryStatisticsResult> entry :
beIdToStatisticsResult.entrySet()) {
+ Long beId = entry.getKey();
+ TQueryStatisticsResult result = entry.getValue();
+
+ // Always update the latest statistics for this backend
+ latestStatisticsResult.put(beId, result);
+
+ // If this backend has finished, mark it as finished
+ if (result.isSetQueryFinished() &&
result.isQueryFinished()) {
+ finishedBackends.add(beId);
+ }
+ }
+ }
+
+ // If not all backends have finished, continue polling
+ if (finishedBackends.size() < expectedBackendCount) {
+ // Sleep before next poll
+ try {
+ Thread.sleep(intervalMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+
+ return latestStatisticsResult;
+ }
+
+ /**
+ * Send aggregated blackhole results to the client.
+ * Aggregates statistics from all backends and sends a summary result set.
+ *
+ * @param statisticsResult Map of backend ID to query statistics result
+ * @param executor The statement executor to send results through
+ * @throws IOException If sending the result set fails
+ */
+ public void sendAggregatedBlackholeResults(Map<Long,
TQueryStatisticsResult> statisticsResult,
+ StmtExecutor executor) throws IOException {
+ List<List<String>> rows = Lists.newArrayList();
+ long totalScanRows = 0;
+ long totalScanBytes = 0;
+ long totalScanBytesFromLocalStorage = 0;
+ long totalScanBytesFromRemoteStorage = 0;
+ long totalBytesWriteIntoCache = 0;
+
+ // Add a row for each BE with its aggregated data
+ for (Map.Entry<Long, TQueryStatisticsResult> entry :
statisticsResult.entrySet()) {
+ Long beId = entry.getKey();
+ TQueryStatisticsResult data = entry.getValue();
+ if (!data.isSetStatistics()) {
+ continue;
+ }
+ TQueryStatistics statistics = data.getStatistics();
+ List<String> row = Lists.newArrayList(
+ beId.toString(),
+ String.valueOf(statistics.getScanRows()),
+ String.valueOf(statistics.getScanBytes()),
+ String.valueOf(statistics.getScanBytesFromLocalStorage()),
+ String.valueOf(statistics.getScanBytesFromRemoteStorage()),
+ String.valueOf(statistics.getBytesWriteIntoCache())
+ );
+
+ rows.add(row);
+
+ // Accumulate totals
+ totalScanRows += statistics.getScanRows();
+ totalScanBytes += statistics.getScanBytes();
+ totalScanBytesFromLocalStorage +=
statistics.getScanBytesFromLocalStorage();
+ totalScanBytesFromRemoteStorage +=
statistics.getScanBytesFromRemoteStorage();
+ totalBytesWriteIntoCache += statistics.getBytesWriteIntoCache();
+ }
+
+ // Add a total row
+ List<String> totalRow = Lists.newArrayList(
+ "TOTAL",
+ String.valueOf(totalScanRows),
+ String.valueOf(totalScanBytes),
+ String.valueOf(totalScanBytesFromLocalStorage),
+ String.valueOf(totalScanBytesFromRemoteStorage),
+ String.valueOf(totalBytesWriteIntoCache)
+ );
+ rows.add(totalRow);
+
+ ShowResultSet resultSet = new ShowResultSet(metaData, rows);
+ executor.sendResultSet(resultSet);
+ }
+
+ /**
+ * Skip auth check for warmup select command.
+ * The blackhole table is a system table and doesn't require LOAD
privilege.
+ * Nereids analyzer will check SELECT privilege on source tables.
+ */
+ @Override
+ protected boolean needAuthCheck(TableIf targetTableIf) {
+ return false;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalBlackholeSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalBlackholeSink.java
new file mode 100644
index 00000000000..97b6c220b86
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalBlackholeSink.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.PropagateFuncDeps;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * logical blackhole sink
+ * The blackhole sink is currently used in "warm up select" SQL statements to
preload file block caches into tables.
+ * It is planned as a terminal sink (like /dev/null),
+ * meaning a "black hole" at the end of the execution plan that discards all
incoming data.
+ */
+public class LogicalBlackholeSink<CHILD_TYPE extends Plan> extends
LogicalSink<CHILD_TYPE>
+ implements Sink, PropagateFuncDeps {
+
+ public LogicalBlackholeSink(List<NamedExpression> outputExprs, CHILD_TYPE
child) {
+ super(PlanType.LOGICAL_BLACKHOLE_SINK, outputExprs, child);
+ }
+
+ public LogicalBlackholeSink(List<NamedExpression> outputExprs,
+ Optional<GroupExpression> groupExpression,
+ Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
+ super(PlanType.LOGICAL_BLACKHOLE_SINK, outputExprs, groupExpression,
logicalProperties, child);
+ }
+
+ @Override
+ public LogicalBlackholeSink<Plan> withChildren(List<Plan> children) {
+ Preconditions.checkArgument(children.size() == 1,
+ "LogicalBlackholeSink's children size must be 1, but real is
%s", children.size());
+ return new LogicalBlackholeSink<>(outputExprs, children.get(0));
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitLogicalBlackholeSink(this, context);
+ }
+
+ @Override
+ public LogicalBlackholeSink<Plan>
withGroupExpression(Optional<GroupExpression> groupExpression) {
+ return new LogicalBlackholeSink<>(outputExprs, groupExpression,
Optional.of(getLogicalProperties()), child());
+ }
+
+ @Override
+ public LogicalBlackholeSink<Plan>
withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
+ Optional<LogicalProperties> logicalProperties, List<Plan>
children) {
+ Preconditions.checkArgument(children.size() == 1,
"LogicalBlackholeSink only accepts one child");
+ return new LogicalBlackholeSink<>(outputExprs, groupExpression,
logicalProperties, children.get(0));
+ }
+
+ @Override
+ public LogicalBlackholeSink<CHILD_TYPE>
withOutputExprs(List<NamedExpression> outputExprs) {
+ return new LogicalBlackholeSink<>(outputExprs, child());
+ }
+
+ @Override
+ public String toString() {
+ return Utils.toSqlString("LogicalBlackholeSink[" + id.asInt() + "]",
+ "outputExprs", outputExprs);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBlackholeSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBlackholeSink.java
new file mode 100644
index 00000000000..53666ccab14
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBlackholeSink.java
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.SqlCacheContext;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.ComputeResultSet;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.qe.ResultSet;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.base.Preconditions;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Physical blackhole sink.
+ * The blackhole sink is currently used in "warm up select" SQL statements to
preload file block caches into tables.
+ * It is planned as a terminal sink (like /dev/null),
+ * meaning a "black hole" at the end of the execution plan that discards all
incoming data.
+ */
+public class PhysicalBlackholeSink<CHILD_TYPE extends Plan> extends
PhysicalSink<CHILD_TYPE>
+ implements Sink, ComputeResultSet {
+
+ public PhysicalBlackholeSink(List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
+ LogicalProperties logicalProperties, CHILD_TYPE child) {
+ this(outputExprs, groupExpression, logicalProperties,
PhysicalProperties.GATHER, null, child);
+ }
+
+ public PhysicalBlackholeSink(List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
+ LogicalProperties logicalProperties, @Nullable PhysicalProperties
physicalProperties,
+ Statistics statistics, CHILD_TYPE child) {
+ super(PlanType.PHYSICAL_BLACKHOLE_SINK, outputExprs, groupExpression,
+ logicalProperties, physicalProperties, statistics, child);
+ }
+
+ @Override
+ public PhysicalBlackholeSink<Plan> withChildren(List<Plan> children) {
+ Preconditions.checkArgument(children.size() == 1,
+ "PhysicalBlackholeSink's children size must be 1, but real is
%s", children.size());
+ return new PhysicalBlackholeSink<>(outputExprs, groupExpression,
getLogicalProperties(),
+ physicalProperties, statistics, children.get(0));
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitPhysicalBlackholeSink(this, context);
+ }
+
+ @Override
+ public List<? extends Expression> getExpressions() {
+ return outputExprs;
+ }
+
+ @Override
+ public PhysicalBlackholeSink<Plan>
withGroupExpression(Optional<GroupExpression> groupExpression) {
+ return new PhysicalBlackholeSink<>(outputExprs, groupExpression,
getLogicalProperties(),
+ physicalProperties, statistics, child());
+ }
+
+ @Override
+ public PhysicalBlackholeSink<Plan>
withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
+ Optional<LogicalProperties> logicalProperties, List<Plan>
children) {
+ Preconditions.checkArgument(children.size() == 1,
+ "PhysicalBlackholeSink's children size must be 1, but real is
%s", children.size());
+ return new PhysicalBlackholeSink<>(outputExprs, groupExpression,
logicalProperties.get(),
+ physicalProperties, statistics, children.get(0));
+ }
+
+ @Override
+ public PhysicalBlackholeSink<Plan> withPhysicalPropertiesAndStats(
+ PhysicalProperties physicalProperties, Statistics statistics) {
+ return new PhysicalBlackholeSink<>(outputExprs, groupExpression,
+ getLogicalProperties(), physicalProperties, statistics,
child());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PhysicalBlackholeSink<?> that = (PhysicalBlackholeSink<?>) o;
+ return outputExprs.equals(that.outputExprs);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(outputExprs);
+ }
+
+ @Override
+ public String toString() {
+ return Utils.toSqlString("PhysicalBlackholeSink[" + id.asInt() + "]",
+ "outputExprs", outputExprs);
+ }
+
+ @Override
+ public PhysicalBlackholeSink<CHILD_TYPE> resetLogicalProperties() {
+ return new PhysicalBlackholeSink<>(outputExprs, groupExpression,
+ null, physicalProperties, statistics, child());
+ }
+
+ @Override
+ public Optional<ResultSet> computeResultInFe(
+ CascadesContext cascadesContext, Optional<SqlCacheContext>
sqlCacheContext, List<Slot> outputSlots) {
+ CHILD_TYPE child = child();
+ if (child instanceof ComputeResultSet) {
+ return ((ComputeResultSet)
child).computeResultInFe(cascadesContext, sqlCacheContext, outputSlots);
+ } else {
+ return Optional.empty();
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
index 06257229d4b..411398ad2e0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
@@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.visitor;
+import org.apache.doris.nereids.analyzer.UnboundBlackholeSink;
import org.apache.doris.nereids.analyzer.UnboundDictionarySink;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
@@ -24,6 +25,7 @@ import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalBlackholeSink;
import
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalDictionarySink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
@@ -34,6 +36,7 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalBlackholeSink;
import
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDictionarySink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
@@ -86,6 +89,10 @@ public interface SinkVisitor<R, C> {
return visitLogicalSink(unboundDictionarySink, context);
}
+ default R visitUnboundBlackholeSink(UnboundBlackholeSink<? extends Plan>
unboundBlackholeSink, C context) {
+ return visitLogicalSink(unboundBlackholeSink, context);
+ }
+
// *******************************
// logical
// *******************************
@@ -127,10 +134,20 @@ public interface SinkVisitor<R, C> {
return visitLogicalSink(logicalDeferMaterializeResultSink, context);
}
+ default R visitLogicalBlackholeSink(
+ LogicalBlackholeSink<? extends Plan> logicalBlackholeSink, C
context) {
+ return visitLogicalSink(logicalBlackholeSink, context);
+ }
+
// *******************************
// physical
// *******************************
+ default R visitPhysicalBlackholeSink(
+ PhysicalBlackholeSink<? extends Plan> sink, C context) {
+ return visitPhysicalSink(sink, context);
+ }
+
default R visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink,
C context) {
return visitPhysicalSink(fileSink, context);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/BlackholeSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/BlackholeSink.java
new file mode 100644
index 00000000000..9a3994a036d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BlackholeSink.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.thrift.TBlackholeSink;
+import org.apache.doris.thrift.TDataSink;
+import org.apache.doris.thrift.TDataSinkType;
+import org.apache.doris.thrift.TExplainLevel;
+
+/**
+ * BlackholeSink is currently used in "warm up select" SQL statements to
preload file block caches into tables.
+ * It is planned as a terminal sink (like /dev/null),
+ * meaning a "black hole" at the end of the execution plan that discards all
incoming data.
+ */
+public class BlackholeSink extends DataSink {
+ private final PlanNodeId exchNodeId;
+
+ public BlackholeSink(PlanNodeId exchNodeId) {
+ this.exchNodeId = exchNodeId;
+ }
+
+ @Override
+ public String getExplainString(String prefix, TExplainLevel explainLevel) {
+ StringBuilder strBuilder = new StringBuilder();
+ strBuilder.append(prefix).append("BLACKHOLE SINK\n");
+ return strBuilder.toString();
+ }
+
+ @Override
+ protected TDataSink toThrift() {
+ TDataSink result = new TDataSink(TDataSinkType.BLACKHOLE_SINK);
+ TBlackholeSink tBlackholeSink = new TBlackholeSink();
+ result.setBlackholeSink(tBlackholeSink);
+ return result;
+ }
+
+ public PlanNodeId getExchNodeId() {
+ return exchNodeId;
+ }
+
+ @Override
+ public DataPartition getOutputPartition() {
+ return null;
+ }
+}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index fafb736c15e..cc31e7858e0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.thrift.TQueryStatistics;
+import org.apache.doris.thrift.TQueryStatisticsResult;
import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams;
import com.google.common.collect.Lists;
@@ -59,7 +60,7 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
this.beLastReportTime = beLastReportTime;
}
- Map<String, Pair<Long, TQueryStatistics>> queryStatsMap =
Maps.newConcurrentMap();
+ Map<String, Pair<Long, TQueryStatisticsResult>> queryStatsMap =
Maps.newConcurrentMap();
}
public WorkloadRuntimeStatusMgr() {
@@ -164,7 +165,7 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
LOG.warn("be report workload runtime status but without beid");
return;
}
- if (!params.isSetQueryStatisticsMap()) {
+ if (!params.isSetQueryStatisticsResultMap()) {
LOG.warn("be report workload runtime status but without query
stats map");
return;
}
@@ -180,8 +181,8 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
} else {
beReportInfo.beLastReportTime = currentTime;
}
- for (Map.Entry<String, TQueryStatistics> entry :
params.query_statistics_map.entrySet()) {
- beReportInfo.queryStatsMap.put(entry.getKey(),
Pair.of(currentTime, (TQueryStatistics) entry.getValue()));
+ for (Map.Entry<String, TQueryStatisticsResult> entry :
params.query_statistics_result_map.entrySet()) {
+ beReportInfo.queryStatsMap.put(entry.getKey(),
Pair.of(currentTime, entry.getValue()));
}
}
@@ -197,7 +198,7 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
}
Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
for (String queryId : queryIdSet) {
- Pair<Long, TQueryStatistics> pair =
beReportInfo.queryStatsMap.get(queryId);
+ Pair<Long, TQueryStatisticsResult> pair =
beReportInfo.queryStatsMap.get(queryId);
long queryLastReportTime = pair.first;
if (currentTime - queryLastReportTime >
Config.be_report_query_statistics_timeout_ms) {
beReportInfo.queryStatsMap.remove(queryId);
@@ -216,7 +217,7 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
for (String queryId : queryIdSet) {
- TQueryStatistics curQueryStats =
beReportInfo.queryStatsMap.get(queryId).second;
+ TQueryStatisticsResult curQueryStats =
beReportInfo.queryStatsMap.get(queryId).second;
TQueryStatistics retQuery = resultQueryMap.get(queryId);
if (retQuery == null) {
@@ -230,19 +231,35 @@ public class WorkloadRuntimeStatusMgr extends
MasterDaemon {
return resultQueryMap;
}
- private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics
src) {
- dst.scan_rows += src.scan_rows;
- dst.scan_bytes += src.scan_bytes;
- dst.scan_bytes_from_local_storage += src.scan_bytes_from_local_storage;
- dst.scan_bytes_from_remote_storage +=
src.scan_bytes_from_remote_storage;
- dst.cpu_ms += src.cpu_ms;
- dst.shuffle_send_bytes += src.shuffle_send_bytes;
- dst.shuffle_send_rows += src.shuffle_send_rows;
- if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) {
- dst.max_peak_memory_bytes = src.max_peak_memory_bytes;
+ public Map<Long, TQueryStatisticsResult> getQueryStatistics(String
queryId) {
+ Map<Long, TQueryStatisticsResult> result = Maps.newHashMap();
+ for (Map.Entry<Long, BeReportInfo> entry :
beToQueryStatsMap.entrySet()) {
+ Pair<Long, TQueryStatisticsResult> pair =
entry.getValue().queryStatsMap.get(queryId);
+ if (pair != null) {
+ result.put(entry.getKey(), pair.second);
+ }
+ }
+ return result;
+ }
+
+
+ private void mergeQueryStatistics(TQueryStatistics dst,
TQueryStatisticsResult src) {
+ TQueryStatistics srcStats = src.getStatistics();
+ if (srcStats == null) {
+ return;
}
- dst.spill_write_bytes_to_local_storage +=
src.spill_write_bytes_to_local_storage;
- dst.spill_read_bytes_from_local_storage +=
src.spill_read_bytes_from_local_storage;
+ dst.scan_rows += srcStats.scan_rows;
+ dst.scan_bytes += srcStats.scan_bytes;
+ dst.scan_bytes_from_local_storage +=
srcStats.scan_bytes_from_local_storage;
+ dst.scan_bytes_from_remote_storage +=
srcStats.scan_bytes_from_remote_storage;
+ dst.cpu_ms += srcStats.cpu_ms;
+ dst.shuffle_send_bytes += srcStats.shuffle_send_bytes;
+ dst.shuffle_send_rows += srcStats.shuffle_send_rows;
+ if (dst.max_peak_memory_bytes < srcStats.max_peak_memory_bytes) {
+ dst.max_peak_memory_bytes = srcStats.max_peak_memory_bytes;
+ }
+ dst.spill_write_bytes_to_local_storage +=
srcStats.spill_write_bytes_to_local_storage;
+ dst.spill_read_bytes_from_local_storage +=
srcStats.spill_read_bytes_from_local_storage;
}
private void queryAuditEventLogWriteLock() {
@@ -252,4 +269,5 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
private void queryAuditEventLogWriteUnlock() {
queryAuditEventLock.unlock();
}
+
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java
index cf373be7507..7ff1273fcca 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java
@@ -1461,4 +1461,31 @@ public class NereidsParserTest extends ParserTestBase {
+ "WHEN NOT MATCHED THEN INSERT VALUES (c1, c2, c3)";
Assertions.assertThrows(ParseException.class, () ->
parser.parseSingle(invalidSql4));
}
+
+ @Test
+ public void testWarmUpSelect() {
+ ConnectContext ctx = ConnectContext.get();
+ ctx.getSessionVariable().setEnableFileCache(true);
+ ctx.getSessionVariable().setDisableFileCache(false);
+ NereidsParser nereidsParser = new NereidsParser();
+
+ // Test basic warm up select statement
+ String warmUpSql = "WARM UP SELECT * FROM test_table";
+ LogicalPlan logicalPlan = nereidsParser.parseSingle(warmUpSql);
+ Assertions.assertNotNull(logicalPlan);
+ Assertions.assertEquals(StmtType.INSERT, logicalPlan.stmtType());
+
+ // Test warm up select with where clause
+ String warmUpSqlWithWhere = "WARM UP SELECT id, name FROM test_table
WHERE id > 10";
+ LogicalPlan logicalPlanWithWhere =
nereidsParser.parseSingle(warmUpSqlWithWhere);
+ Assertions.assertNotNull(logicalPlanWithWhere);
+ Assertions.assertEquals(StmtType.INSERT,
logicalPlanWithWhere.stmtType());
+
+ // Negative cases: LIMIT, JOIN, UNION, AGGREGATE not allowed
+ Assertions.assertThrows(ParseException.class, () ->
nereidsParser.parseSingle("WARM UP SELECT * FROM test_table LIMIT 100"));
+ Assertions.assertThrows(ParseException.class, () ->
nereidsParser.parseSingle("WARM UP SELECT * FROM t1 JOIN t2 ON t1.id = t2.id"));
+ Assertions.assertThrows(ParseException.class, () ->
nereidsParser.parseSingle("WARM UP SELECT * FROM t1 UNION SELECT * FROM t2"));
+ Assertions.assertThrows(ParseException.class, () ->
nereidsParser.parseSingle("WARM UP SELECT id, COUNT(*) FROM test_table GROUP BY
id"));
+ Assertions.assertThrows(ParseException.class, () ->
nereidsParser.parseSingle("WARM UP SELECT * FROM test_table ORDER BY id"));
+ }
}
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index fb4ebd90e00..bda5daa03bf 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -39,6 +39,7 @@ message PQueryStatistics {
optional int64 scan_bytes_from_remote_storage = 8;
optional int64 spill_write_bytes_to_local_storage = 9;
optional int64 spill_read_bytes_from_local_storage = 10;
+ optional int64 bytes_write_into_cache = 11;
}
message PRowBatch {
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index dcb62ea4159..d218db5dba5 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -41,6 +41,7 @@ enum TDataSinkType {
HIVE_TABLE_SINK = 13,
ICEBERG_TABLE_SINK = 14,
DICTIONARY_SINK = 15,
+ BLACKHOLE_SINK = 16,
}
enum TResultSinkType {
@@ -444,6 +445,9 @@ struct TDictionarySink {
9: optional i64 memory_limit
}
+struct TBlackholeSink {
+}
+
struct TDataSink {
1: required TDataSinkType type
2: optional TDataStreamSink stream_sink
@@ -459,4 +463,5 @@ struct TDataSink {
13: optional THiveTableSink hive_table_sink
14: optional TIcebergTableSink iceberg_table_sink
15: optional TDictionarySink dictionary_sink
+ 16: optional TBlackholeSink blackhole_sink
}
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index eca2ef59d9e..b9fe52c59f2 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -159,6 +159,7 @@ enum TSchemaTableType {
SCH_SQL_BLOCK_RULE_STATUS = 59;
SCH_CLUSTER_SNAPSHOTS = 60;
SCH_CLUSTER_SNAPSHOT_PROPERTIES = 61;
+ SCH_BLACKHOLE = 62;
}
enum THdfsCompression {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index f0399df51ec..54a6d1555e5 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -201,11 +201,18 @@ struct TQueryStatistics {
11: optional i64 scan_bytes_from_remote_storage
12: optional i64 spill_write_bytes_to_local_storage
13: optional i64 spill_read_bytes_from_local_storage
+ 14: optional i64 bytes_write_into_cache
+}
+
+struct TQueryStatisticsResult {
+ 1: optional bool query_finished
+ 2: optional TQueryStatistics statistics
}
struct TReportWorkloadRuntimeStatusParams {
1: optional i64 backend_id
- 2: optional map<string, TQueryStatistics> query_statistics_map
+ 2: optional map<string, TQueryStatistics> query_statistics_map //
deprecated
+ 3: optional map<string, TQueryStatisticsResult> query_statistics_result_map
}
struct TQueryProfile {
diff --git
a/regression-test/suites/cloud_p0/warm_up_select/test_warmup_select.groovy
b/regression-test/suites/cloud_p0/warm_up_select/test_warmup_select.groovy
new file mode 100644
index 00000000000..2b0b39affc2
--- /dev/null
+++ b/regression-test/suites/cloud_p0/warm_up_select/test_warmup_select.groovy
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_warmup_select") {
+ sql """
+ drop table if exists lineitem
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS lineitem (
+ l_orderkey INTEGER NOT NULL,
+ l_partkey INTEGER NOT NULL,
+ l_suppkey INTEGER NOT NULL,
+ l_linenumber INTEGER NOT NULL,
+ l_quantity DECIMALV3(15,2) NOT NULL,
+ l_extendedprice DECIMALV3(15,2) NOT NULL,
+ l_discount DECIMALV3(15,2) NOT NULL,
+ l_tax DECIMALV3(15,2) NOT NULL,
+ l_returnflag CHAR(1) NOT NULL,
+ l_linestatus CHAR(1) NOT NULL,
+ l_shipdate DATE NOT NULL,
+ l_commitdate DATE NOT NULL,
+ l_receiptdate DATE NOT NULL,
+ l_shipinstruct CHAR(25) NOT NULL,
+ l_shipmode CHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+ )
+ DUPLICATE KEY(l_orderkey, l_partkey, l_suppkey, l_linenumber)
+ PARTITION BY RANGE(l_shipdate) (
+ PARTITION `day_2` VALUES LESS THAN ('2023-12-9'),
+ PARTITION `day_3` VALUES LESS THAN ("2023-12-11"),
+ PARTITION `day_4` VALUES LESS THAN ("2023-12-30")
+ )
+ DISTRIBUTED BY HASH(l_orderkey) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ sql """
+ insert into lineitem values
+ (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-08', '2023-12-09',
'2023-12-10', 'a', 'b', 'yyyyyyyyy'),
+ (2, 4, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-09', '2023-12-09',
'2023-12-10', 'a', 'b', 'yyyyyyyyy'),
+ (3, 2, 4, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-10', '2023-12-09',
'2023-12-10', 'a', 'b', 'yyyyyyyyy'),
+ (4, 3, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-11', '2023-12-09',
'2023-12-10', 'a', 'b', 'yyyyyyyyy'),
+ (5, 2, 3, 6, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-12-12', '2023-12-12',
'2023-12-13', 'c', 'd', 'xxxxxxxxx');
+ """
+
+ def test_basic_warmup = {
+ // Enable file cache for warm up functionality
+ sql "set disable_file_cache=false"
+
+ sql "WARM UP SELECT * FROM lineitem"
+
+ sql "WARM UP SELECT l_orderkey, l_discount FROM lineitem"
+
+ sql "WARM UP SELECT l_orderkey, l_discount FROM lineitem WHERE
l_quantity > 10"
+ }
+
+ def test_warmup_negative_cases = {
+ // Enable file cache for warm up functionality
+ sql "set disable_file_cache=false"
+
+ // These should fail as warm up select doesn't support these operations
+ try {
+ sql "WARM UP SELECT * FROM lineitem LIMIT 5"
+ assert false : "Expected ParseException for LIMIT clause"
+ } catch (Exception e) {
+ // Expected to fail
+ println "LIMIT clause correctly rejected for WARM UP SELECT"
+ }
+
+ try {
+ sql "WARM UP SELECT l_shipmode, COUNT(*) FROM lineitem GROUP BY
l_shipmode"
+ assert false : "Expected ParseException for GROUP BY clause"
+ } catch (Exception e) {
+ // Expected to fail
+ println "GROUP BY clause correctly rejected for WARM UP SELECT"
+ }
+
+ try {
+ sql "WARM UP SELECT * FROM lineitem t1 JOIN lineitem t2 ON
t1.l_orderkey = t2.l_orderkey"
+ assert false : "Expected ParseException for JOIN clause"
+ } catch (Exception e) {
+ // Expected to fail
+ println "JOIN clause correctly rejected for WARM UP SELECT"
+ }
+
+ try {
+ sql "WARM UP SELECT * FROM lineitem UNION SELECT * FROM lineitem"
+ assert false : "Expected ParseException for UNION clause"
+ } catch (Exception e) {
+ // Expected to fail
+ println "UNION clause correctly rejected for WARM UP SELECT"
+ }
+ }
+
+ test_basic_warmup()
+ test_warmup_negative_cases()
+}
\ No newline at end of file
diff --git
a/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
b/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
new file mode 100644
index 00000000000..7d270ffe3b8
--- /dev/null
+++
b/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hive_warmup_select",
"p0,external,hive,external_docker,external_docker_hive") {
+ String enabled = context.config.otherConfigs.get("enableHiveTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable Hive test.")
+ return;
+ }
+
+ def test_basic_warmup = {
+ // Enable file cache for warm up functionality
+ sql "set enable_file_cache=true"
+ sql "set disable_file_cache=false"
+
+ sql "WARM UP SELECT * FROM lineitem"
+
+ sql "WARM UP SELECT l_orderkey, l_discount FROM lineitem"
+
+ sql "WARM UP SELECT l_orderkey, l_discount FROM lineitem WHERE
l_quantity > 10"
+ }
+
+ def test_warmup_negative_cases = {
+ // Enable file cache for warm up functionality
+ sql "set enable_file_cache=true"
+ sql "set disable_file_cache=false"
+
+ // These should fail as warm up select doesn't support these operations
+ try {
+ sql "WARM UP SELECT * FROM lineitem LIMIT 5"
+ assert false : "Expected ParseException for LIMIT clause"
+ } catch (Exception e) {
+ // Expected to fail
+ println "LIMIT clause correctly rejected for WARM UP SELECT"
+ }
+
+ try {
+ sql "WARM UP SELECT l_shipmode, COUNT(*) FROM lineitem GROUP BY
l_shipmode"
+ assert false : "Expected ParseException for GROUP BY clause"
+ } catch (Exception e) {
+ // Expected to fail
+ println "GROUP BY clause correctly rejected for WARM UP SELECT"
+ }
+
+ try {
+ sql "WARM UP SELECT * FROM lineitem t1 JOIN lineitem t2 ON
t1.l_orderkey = t2.l_orderkey"
+ assert false : "Expected ParseException for JOIN clause"
+ } catch (Exception e) {
+ // Expected to fail
+ println "JOIN clause correctly rejected for WARM UP SELECT"
+ }
+
+ try {
+ sql "WARM UP SELECT * FROM lineitem UNION SELECT * FROM lineitem"
+ assert false : "Expected ParseException for UNION clause"
+ } catch (Exception e) {
+ // Expected to fail
+ println "UNION clause correctly rejected for WARM UP SELECT"
+ }
+ }
+
+ def test_warmup_permission = { String catalog_name ->
+ // Test that warm up select only requires SELECT privilege, not LOAD
privilege
+ def user1 = 'test_hive_warmup_user1'
+ def pwd = '123456'
+ def tokens = context.config.jdbcUrl.split('/')
+ def url = tokens[0] + "//" + tokens[2] +
"/?defaultCatalog=${catalog_name}"
+
+ // Clean up
+ sql """DROP USER IF EXISTS ${user1}"""
+
+ try {
+ // Create user with only SELECT privilege on external catalog
+ sql """CREATE USER '${user1}' IDENTIFIED BY '${pwd}'"""
+ sql """GRANT SELECT_PRIV ON ${catalog_name}.tpch1_parquet.lineitem
TO ${user1}"""
+
+ // Test: user with only SELECT privilege should be able to run
WARM UP SELECT
+ connect(user1, "${pwd}", url) {
+ sql "set enable_file_cache=true"
+ sql "set disable_file_cache=false"
+
+ // This should succeed - only SELECT privilege is needed
+ sql "WARM UP SELECT * FROM
${catalog_name}.tpch1_parquet.lineitem"
+
+ sql "WARM UP SELECT l_orderkey, l_discount FROM
${catalog_name}.tpch1_parquet.lineitem WHERE l_quantity > 10"
+
+ // Verify regular SELECT also works
+ def result = sql "SELECT COUNT(*) FROM
${catalog_name}.tpch1_parquet.lineitem"
+ assert result.size() > 0
+ }
+
+ // Test: user without SELECT privilege should fail
+ sql """REVOKE SELECT_PRIV ON
${catalog_name}.tpch1_parquet.lineitem FROM ${user1}"""
+
+ connect(user1, "${pwd}", url) {
+ sql "set enable_file_cache=true"
+ sql "set disable_file_cache=false"
+ test {
+ sql "WARM UP SELECT * FROM
${catalog_name}.tpch1_parquet.lineitem"
+ exception "denied"
+ }
+ }
+
+ // Test: user with LOAD privilege but no SELECT privilege should
also fail
+ sql """GRANT LOAD_PRIV ON ${catalog_name}.tpch1_parquet.lineitem
TO ${user1}"""
+
+ connect(user1, "${pwd}", url) {
+ sql "set enable_file_cache=true"
+ sql "set disable_file_cache=false"
+ test {
+ sql "WARM UP SELECT * FROM
${catalog_name}.tpch1_parquet.lineitem"
+ exception "denied"
+ }
+ }
+
+ } finally {
+ // Clean up
+ sql """DROP USER IF EXISTS ${user1}"""
+ }
+ }
+
+ for (String hivePrefix : ["hive3"]) {
+ String hms_port = context.config.otherConfigs.get(hivePrefix +
"HmsPort")
+ String catalog_name = "test_${hivePrefix}_warmup_select"
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """create catalog if not exists ${catalog_name} properties (
+ "type"="hms",
+ 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
+ );"""
+ sql """switch ${catalog_name}"""
+ sql """use `tpch1_parquet`"""
+
+ test_basic_warmup()
+ test_warmup_negative_cases()
+ test_warmup_permission(catalog_name)
+
+ sql """drop catalog if exists ${catalog_name}"""
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]