This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new f88c44aaaeb branch-4.0: [feature](file-cache) Implement cache warm up 
select functionality. (#58315)
f88c44aaaeb is described below

commit f88c44aaaeb8ba193f4efe810940c68b97c54a84
Author: Qi Chen <[email protected]>
AuthorDate: Tue Nov 25 16:11:43 2025 +0800

    branch-4.0: [feature](file-cache) Implement cache warm up select 
functionality. (#58315)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    ### Release note
    
    Cherry pick #54822 #58312
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/pipeline/exec/blackhole_sink_operator.cpp   | 112 ++++++++++
 be/src/pipeline/exec/blackhole_sink_operator.h     |  90 ++++++++
 be/src/pipeline/exec/operator.cpp                  |   2 +
 be/src/pipeline/pipeline_fragment_context.cpp      |  11 +
 be/src/runtime/runtime_query_statistics_mgr.cpp    |  20 +-
 be/src/runtime/workload_management/io_context.h    |  10 +
 .../workload_management/resource_context.cpp       |   1 +
 be/src/vec/exec/scan/file_scanner.cpp              |   2 +
 be/src/vec/exec/scan/olap_scanner.cpp              |   2 +
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |   6 +
 .../org/apache/doris/analysis/SchemaTableType.java |   4 +-
 .../java/org/apache/doris/catalog/SchemaTable.java |   7 +
 .../doris/common/profile/ProfileManager.java       |   3 +-
 .../java/org/apache/doris/common/util/Util.java    |  17 +-
 .../nereids/analyzer/UnboundBlackholeSink.java     | 158 ++++++++++++++
 .../glue/translator/PhysicalPlanTranslator.java    |  11 +
 .../doris/nereids/parser/LogicalPlanBuilder.java   |  68 ++++++
 .../nereids/properties/RequestPropertyDeriver.java |   8 +
 .../org/apache/doris/nereids/rules/RuleSet.java    |   2 +
 .../org/apache/doris/nereids/rules/RuleType.java   |   2 +
 .../doris/nereids/rules/analysis/BindSink.java     |  25 ++-
 ...ogicalBlackholeSinkToPhysicalBlackholeSink.java |  43 ++++
 .../apache/doris/nereids/trees/plans/PlanType.java |   5 +
 .../commands/insert/AbstractInsertExecutor.java    |  35 ++++
 .../commands/insert/BlackholeInsertExecutor.java   |  91 ++++++++
 .../commands/insert/InsertIntoTableCommand.java    |  42 +++-
 .../trees/plans/commands/insert/InsertUtils.java   |   3 +
 .../plans/commands/insert/WarmupSelectCommand.java | 228 +++++++++++++++++++++
 .../trees/plans/logical/LogicalBlackholeSink.java  |  88 ++++++++
 .../plans/physical/PhysicalBlackholeSink.java      | 144 +++++++++++++
 .../nereids/trees/plans/visitor/SinkVisitor.java   |  17 ++
 .../org/apache/doris/planner/BlackholeSink.java    |  61 ++++++
 .../WorkloadRuntimeStatusMgr.java                  |  54 +++--
 .../doris/nereids/parser/NereidsParserTest.java    |  27 +++
 gensrc/proto/data.proto                            |   1 +
 gensrc/thrift/DataSinks.thrift                     |   5 +
 gensrc/thrift/Descriptors.thrift                   |   1 +
 gensrc/thrift/FrontendService.thrift               |   9 +-
 .../warm_up_select/test_warmup_select.groovy       | 113 ++++++++++
 .../cache/test_hive_warmup_select.groovy           | 156 ++++++++++++++
 40 files changed, 1646 insertions(+), 38 deletions(-)

diff --git a/be/src/pipeline/exec/blackhole_sink_operator.cpp 
b/be/src/pipeline/exec/blackhole_sink_operator.cpp
new file mode 100644
index 00000000000..ca23b8b1fbe
--- /dev/null
+++ b/be/src/pipeline/exec/blackhole_sink_operator.cpp
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "blackhole_sink_operator.h"
+
+#include <fmt/format.h>
+#include <gen_cpp/PaloInternalService_types.h>
+
+#include <sstream>
+
+#include "common/logging.h"
+#include "common/status.h"
+#include "pipeline/dependency.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+
+namespace doris {
+namespace pipeline {
+
+BlackholeSinkOperatorX::BlackholeSinkOperatorX(int operator_id) : 
Base(operator_id, 0, 0) {}
+
+Status BlackholeSinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(DataSinkOperatorXBase::prepare(state));
+    return Status::OK();
+}
+
+Status BlackholeSinkOperatorX::init(const TDataSink& tsink) {
+    RETURN_IF_ERROR(DataSinkOperatorXBase::init(tsink));
+    return Status::OK();
+}
+
+Status BlackholeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
block, bool eos) {
+    auto& local_state = get_local_state(state);
+    SCOPED_TIMER(local_state.exec_time_counter());
+    COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
+
+    if (block && block->rows() > 0) {
+        RETURN_IF_ERROR(_process_block(state, block));
+    }
+
+    return Status::OK();
+}
+
+Status BlackholeSinkOperatorX::_process_block(RuntimeState* state, 
vectorized::Block* block) {
+    auto& local_state = get_local_state(state);
+
+    // Update metrics - count rows and bytes processed
+    local_state._rows_processed += block->rows();
+    local_state._bytes_processed += block->bytes();
+
+    // Update runtime counters
+    if (local_state._rows_processed_timer) {
+        COUNTER_UPDATE(local_state._rows_processed_timer, block->rows());
+    }
+    if (local_state._bytes_processed_timer) {
+        COUNTER_UPDATE(local_state._bytes_processed_timer, block->bytes());
+    }
+
+    // The BLACKHOLE discards the data
+    // We don't write the block anywhere - it's effectively sent to /dev/null
+    return Status::OK();
+}
+
+Status BlackholeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_init_timer);
+
+    // Initialize performance counters
+    _rows_processed_timer = ADD_COUNTER(custom_profile(), "RowsProcessed", 
TUnit::UNIT);
+    _bytes_processed_timer = ADD_COUNTER(custom_profile(), "BytesProcessed", 
TUnit::BYTES);
+
+    return Status::OK();
+}
+
+Status BlackholeSinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+
+    return Status::OK();
+}
+
+Status BlackholeSinkLocalState::close(RuntimeState* state, Status exec_status) 
{
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_close_timer);
+
+    return Base::close(state, exec_status);
+}
+
+Status BlackholeSinkOperatorX::close(RuntimeState* state) {
+    return Status::OK();
+}
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/blackhole_sink_operator.h 
b/be/src/pipeline/exec/blackhole_sink_operator.h
new file mode 100644
index 00000000000..1acf6074d3e
--- /dev/null
+++ b/be/src/pipeline/exec/blackhole_sink_operator.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/Types_types.h>
+#include <stdint.h>
+
+#include <memory>
+
+#include "operator.h"
+#include "vec/core/block.h"
+
+namespace doris {
+
+class TDataSink;
+
+namespace vectorized {
+class Block;
+}
+
+namespace pipeline {
+
+// Forward declaration
+class BlackholeSinkOperatorX;
+
+class BlackholeSinkLocalState final : public 
PipelineXSinkLocalState<FakeSharedState> {
+    ENABLE_FACTORY_CREATOR(BlackholeSinkLocalState);
+
+public:
+    using Parent = BlackholeSinkOperatorX;
+    using Base = PipelineXSinkLocalState<FakeSharedState>;
+    BlackholeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+            : Base(parent, state) {}
+
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override;
+    Status close(RuntimeState* state, Status exec_status) override;
+
+    int64_t _rows_processed = 0;
+    int64_t _bytes_processed = 0;
+
+    RuntimeProfile::Counter* _rows_processed_timer = nullptr;
+    RuntimeProfile::Counter* _bytes_processed_timer = nullptr;
+
+private:
+    friend class BlackholeSinkOperatorX;
+};
+
+class BlackholeSinkOperatorX final : public 
DataSinkOperatorX<BlackholeSinkLocalState> {
+public:
+    using Base = DataSinkOperatorX<BlackholeSinkLocalState>;
+
+    BlackholeSinkOperatorX(int operator_id);
+
+    Status prepare(RuntimeState* state) override;
+
+    Status init(const TDataSink& tsink) override;
+
+    Status sink(RuntimeState* state, vectorized::Block* block, bool eos) 
override;
+
+    Status close(RuntimeState* state) override;
+
+private:
+    friend class BlackholeSinkLocalState;
+
+    /**
+     * Process a data block by discarding it and collecting metrics.
+     * This simulates a "/dev/null" sink - data goes in but nothing comes out.
+     */
+    Status _process_block(RuntimeState* state, vectorized::Block* block);
+};
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 6f0eb562f0f..2b2123849ea 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -24,6 +24,7 @@
 #include "pipeline/exec/analytic_sink_operator.h"
 #include "pipeline/exec/analytic_source_operator.h"
 #include "pipeline/exec/assert_num_rows_operator.h"
+#include "pipeline/exec/blackhole_sink_operator.h"
 #include "pipeline/exec/cache_sink_operator.h"
 #include "pipeline/exec/cache_source_operator.h"
 #include "pipeline/exec/datagen_operator.h"
@@ -783,6 +784,7 @@ DECLARE_OPERATOR(OlapTableSinkV2LocalState)
 DECLARE_OPERATOR(HiveTableSinkLocalState)
 DECLARE_OPERATOR(IcebergTableSinkLocalState)
 DECLARE_OPERATOR(AnalyticSinkLocalState)
+DECLARE_OPERATOR(BlackholeSinkLocalState)
 DECLARE_OPERATOR(SortSinkLocalState)
 DECLARE_OPERATOR(SpillSortSinkLocalState)
 DECLARE_OPERATOR(LocalExchangeSinkLocalState)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index bd0ce44d78d..38704bf10a0 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -46,6 +46,7 @@
 #include "pipeline/exec/analytic_sink_operator.h"
 #include "pipeline/exec/analytic_source_operator.h"
 #include "pipeline/exec/assert_num_rows_operator.h"
+#include "pipeline/exec/blackhole_sink_operator.h"
 #include "pipeline/exec/cache_sink_operator.h"
 #include "pipeline/exec/cache_source_operator.h"
 #include "pipeline/exec/datagen_operator.h"
@@ -104,6 +105,8 @@
 #include "pipeline_task.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
+#include "runtime/result_block_buffer.h"
+#include "runtime/result_buffer_mgr.h"
 #include "runtime/runtime_state.h"
 #include "runtime/stream_load/new_load_stream_mgr.h"
 #include "runtime/thread_context.h"
@@ -1163,6 +1166,14 @@ Status 
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
         }
         break;
     }
+    case TDataSinkType::BLACKHOLE_SINK: {
+        if (!thrift_sink.__isset.blackhole_sink) {
+            return Status::InternalError("Missing blackhole sink.");
+        }
+
+        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
+        break;
+    }
     default:
         return Status::InternalError("Unsuported sink type in pipeline: {}", 
thrift_sink.type);
     }
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index e1f97b8cdc0..b08f8705b6c 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -338,7 +338,8 @@ void RuntimeQueryStatisticsMgr::register_resource_context(
 void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
     int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id;
     // 1 get query statistics map
-    std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>> 
fe_qs_map;
+    // <fe_addr, <query_id, <query_statistics, is_query_finished>>>
+    std::map<TNetworkAddress, std::map<std::string, 
std::pair<TQueryStatistics, bool>>> fe_qs_map;
     std::map<std::string, std::pair<bool, bool>> qs_status; // <finished, 
timeout>
     {
         std::lock_guard<std::shared_mutex> 
write_lock(_resource_contexts_map_lock);
@@ -364,13 +365,14 @@ void 
RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
                 if (resource_ctx->task_controller()->query_type() != 
TQueryType::EXTERNAL) {
                     if 
(fe_qs_map.find(resource_ctx->task_controller()->fe_addr()) ==
                         fe_qs_map.end()) {
-                        std::map<std::string, TQueryStatistics> tmp_map;
+                        std::map<std::string, std::pair<TQueryStatistics, 
bool>> tmp_map;
                         fe_qs_map[resource_ctx->task_controller()->fe_addr()] 
= std::move(tmp_map);
                     }
 
                     TQueryStatistics ret_t_qs;
                     resource_ctx->to_thrift_query_statistics(&ret_t_qs);
-                    
fe_qs_map.at(resource_ctx->task_controller()->fe_addr())[query_id] = ret_t_qs;
+                    
fe_qs_map.at(resource_ctx->task_controller()->fe_addr())[query_id] =
+                            std::make_pair(ret_t_qs, is_query_finished);
                     qs_status[query_id] =
                             std::make_pair(is_query_finished, 
is_timeout_after_finish);
                 }
@@ -407,7 +409,17 @@ void 
RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
         // 2.2 send report
         TReportWorkloadRuntimeStatusParams report_runtime_params;
         report_runtime_params.__set_backend_id(be_id);
-        report_runtime_params.__set_query_statistics_map(qs_map);
+
+        // Build the query statistics map with TQueryStatisticsResult
+        std::map<std::string, TQueryStatisticsResult> query_stats_result_map;
+        for (const auto& [query_id, query_stats_pair] : qs_map) {
+            TQueryStatisticsResult stats_result;
+            stats_result.__set_statistics(query_stats_pair.first);      // 
TQueryStatistics
+            stats_result.__set_query_finished(query_stats_pair.second); // 
is_query_finished
+            query_stats_result_map[query_id] = stats_result;
+        }
+
+        
report_runtime_params.__set_query_statistics_result_map(query_stats_result_map);
 
         TReportExecStatusParams params;
         params.__set_report_workload_runtime_status(report_runtime_params);
diff --git a/be/src/runtime/workload_management/io_context.h 
b/be/src/runtime/workload_management/io_context.h
index f66e8ac6b6d..5c4cf33a3a2 100644
--- a/be/src/runtime/workload_management/io_context.h
+++ b/be/src/runtime/workload_management/io_context.h
@@ -40,6 +40,8 @@ public:
         RuntimeProfile::Counter* scan_bytes_counter_;
         RuntimeProfile::Counter* scan_bytes_from_local_storage_counter_;
         RuntimeProfile::Counter* scan_bytes_from_remote_storage_counter_;
+        RuntimeProfile::Counter* bytes_write_into_cache_counter_;
+
         // number rows returned by query.
         // only set once by result sink when closing.
         RuntimeProfile::Counter* returned_rows_counter_;
@@ -58,6 +60,8 @@ public:
                     ADD_COUNTER(profile_, "ScanBytesFromLocalStorage", 
TUnit::BYTES);
             scan_bytes_from_remote_storage_counter_ =
                     ADD_COUNTER(profile_, "ScanBytesFromRemoteStorage", 
TUnit::BYTES);
+            bytes_write_into_cache_counter_ =
+                    ADD_COUNTER(profile_, "BytesWriteIntoCache", TUnit::BYTES);
             returned_rows_counter_ = ADD_COUNTER(profile_, "ReturnedRows", 
TUnit::UNIT);
             shuffle_send_bytes_counter_ = ADD_COUNTER(profile_, 
"ShuffleSendBytes", TUnit::BYTES);
             shuffle_send_rows_counter_ =
@@ -86,6 +90,9 @@ public:
     int64_t scan_bytes_from_remote_storage() const {
         return stats_.scan_bytes_from_remote_storage_counter_->value();
     }
+    int64_t bytes_write_into_cache() const {
+        return stats_.bytes_write_into_cache_counter_->value();
+    }
     int64_t returned_rows() const { return 
stats_.returned_rows_counter_->value(); }
     int64_t shuffle_send_bytes() const { return 
stats_.shuffle_send_bytes_counter_->value(); }
     int64_t shuffle_send_rows() const { return 
stats_.shuffle_send_rows_counter_->value(); }
@@ -106,6 +113,9 @@ public:
     void update_scan_bytes_from_remote_storage(int64_t delta) const {
         stats_.scan_bytes_from_remote_storage_counter_->update(delta);
     }
+    void update_bytes_write_into_cache(int64_t delta) const {
+        stats_.bytes_write_into_cache_counter_->update(delta);
+    }
     void update_returned_rows(int64_t delta) const { 
stats_.returned_rows_counter_->update(delta); }
     void update_shuffle_send_bytes(int64_t delta) const {
         stats_.shuffle_send_bytes_counter_->update(delta);
diff --git a/be/src/runtime/workload_management/resource_context.cpp 
b/be/src/runtime/workload_management/resource_context.cpp
index 4cab0afb449..c1b0fd2b744 100644
--- a/be/src/runtime/workload_management/resource_context.cpp
+++ b/be/src/runtime/workload_management/resource_context.cpp
@@ -38,6 +38,7 @@ void 
ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c
     statistics->__set_scan_bytes_from_remote_storage(
             io_context()->scan_bytes_from_remote_storage());
     
statistics->__set_scan_bytes_from_local_storage(io_context()->scan_bytes_from_local_storage());
+    
statistics->__set_bytes_write_into_cache(io_context()->bytes_write_into_cache());
 
     if (workload_group() != nullptr) {
         statistics->__set_workload_group_id(workload_group()->id());
diff --git a/be/src/vec/exec/scan/file_scanner.cpp 
b/be/src/vec/exec/scan/file_scanner.cpp
index 2a34972f05f..2e1731aa36e 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -1810,6 +1810,8 @@ void FileScanner::_collect_profile_before_close() {
         _profile != nullptr) {
         io::FileCacheProfileReporter cache_profile(_profile);
         cache_profile.update(_file_cache_statistics.get());
+        
_state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
+                _file_cache_statistics->bytes_write_into_cache);
     }
 
     if (_cur_reader != nullptr) {
diff --git a/be/src/vec/exec/scan/olap_scanner.cpp 
b/be/src/vec/exec/scan/olap_scanner.cpp
index c0c0c35f0be..e60bf769f2a 100644
--- a/be/src/vec/exec/scan/olap_scanner.cpp
+++ b/be/src/vec/exec/scan/olap_scanner.cpp
@@ -777,6 +777,8 @@ void OlapScanner::_collect_profile_before_close() {
     if (config::is_cloud_mode() && config::enable_file_cache) {
         io::FileCacheProfileReporter 
cache_profile(local_state->_segment_profile.get());
         cache_profile.update(&stats.file_cache_stats);
+        
_state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
+                stats.file_cache_stats.bytes_write_into_cache);
     }
     COUNTER_UPDATE(local_state->_output_index_result_column_timer,
                    stats.output_index_result_column_timer);
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index ef036f97134..9ca29f37594 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -503,6 +503,8 @@ supportedOtherStatement
         ((CLUSTER | COMPUTE GROUP) source=identifier |
             (warmUpItem (AND warmUpItem)*)) FORCE?
             properties=propertyClause?                                         
     #warmUpCluster
+    | explain? WARM UP SELECT namedExpressionSeq
+      FROM warmUpSingleTableRef whereClause?                                   
     #warmUpSelect
     | BACKUP SNAPSHOT label=multipartIdentifier TO repo=identifier
         ((ON | EXCLUDE) LEFT_PAREN baseTableRef (COMMA baseTableRef)* 
RIGHT_PAREN)?
         properties=propertyClause?                                             
     #backup
@@ -513,6 +515,10 @@ supportedOtherStatement
     : TABLE tableName=multipartIdentifier (PARTITION partitionName=identifier)?
     ;
 
+warmUpSingleTableRef
+    : multipartIdentifier tableAlias?
+    ;
+
 lockTable
     : name=multipartIdentifier (AS alias=identifierOrText)?
         (READ (LOCAL)? | (LOW_PRIORITY)? WRITE)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
index 85f16ab6d15..aafcae4a913 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
@@ -104,7 +104,9 @@ public enum SchemaTableType {
                           TSchemaTableType.SCH_ENCRYPTION_KEYS),
     SCH_CLUSTER_SNAPSHOTS("CLUSTER_SNAPSHOTS", "CLUSTER_SNAPSHOTS", 
TSchemaTableType.SCH_CLUSTER_SNAPSHOTS),
     SCH_CLUSTER_SNAPSHOT_PROPERTIES("CLUSTER_SNAPSHOT_PROPERTIES", 
"CLUSTER_SNAPSHOT_PROPERTIES",
-            TSchemaTableType.SCH_CLUSTER_SNAPSHOT_PROPERTIES);
+            TSchemaTableType.SCH_CLUSTER_SNAPSHOT_PROPERTIES),
+    SCH_BLACKHOLE("BLACKHOLE", "BLACKHOLE",
+            TSchemaTableType.SCH_BLACKHOLE);
 
     private static final String dbName = "INFORMATION_SCHEMA";
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 37b3fdd8d26..deb8aac86a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -51,6 +51,8 @@ public class SchemaTable extends Table {
     private static final int PRIVILEGE_TYPE_LEN = 64;
     private static final int IS_GRANTABLE_LEN = 3;
 
+    public static final String BLACKHOLE_TABLE_NAME = "blackhole";
+
     // Now we just mock tables, table_privileges, referential_constraints, 
key_column_usage and routines table
     // Because in MySQL ODBC, these tables are used.
     // TODO(zhaochun): Review some commercial BI to check if we need support 
where clause in show statement
@@ -689,6 +691,11 @@ public class SchemaTable extends Table {
                             .column("REF_TYPE", 
ScalarType.createVarchar(NAME_CHAR_LEN))
                             .build())
             )
+            .put(BLACKHOLE_TABLE_NAME,
+                    new SchemaTable(SystemIdGenerator.getNextId(), 
BLACKHOLE_TABLE_NAME, TableType.SCHEMA,
+                            builder().column("VERSION", 
ScalarType.createType(PrimitiveType.INT))
+                                    .build())
+            )
             .put("encryption_keys",
                     new SchemaTable(SystemIdGenerator.getNextId(), 
"encryption_keys", TableType.SCHEMA,
                         builder().column("ID", ScalarType.createStringType())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
index 81bdbbbf005..b9fcc581cf0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
@@ -396,7 +396,8 @@ public class ProfileManager extends MasterDaemon {
                 } else {
                     LOG.warn("Failed to get real-time query stats, id {}, resp 
is {}",
                             queryId, resp == null ? "null" : resp.toString());
-                    throw new Exception("Failed to get realtime query stats: " 
+ resp.toString());
+                    throw new Exception("Failed to get realtime query stats: "
+                            + (resp == null ? "null" : resp.toString()));
                 }
             } catch (Exception e) {
                 LOG.warn("Failed to get real-time query stats, id {}, error: 
{}", queryId, e.getMessage(), e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index dcde71a7a6a..4ac9341af96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -654,16 +654,19 @@ public class Util {
 
     public static String getRootCauseMessage(Throwable t) {
         String rootCause = "unknown";
+        if (t == null) {
+            return rootCause;
+        }
         Throwable p = t;
-        while (p != null) {
-            String message = p.getMessage();
-            if (message == null) {
-                rootCause = p.getClass().getName();
-            } else {
-                rootCause = p.getClass().getName() + ": " + p.getMessage();
-            }
+        while (p.getCause() != null) {
             p = p.getCause();
         }
+        String message = p.getMessage();
+        if (message == null) {
+            rootCause = p.getClass().getName();
+        } else {
+            rootCause = p.getClass().getName() + ": " + message;
+        }
         return rootCause;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundBlackholeSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundBlackholeSink.java
new file mode 100644
index 00000000000..4a3b76820e5
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundBlackholeSink.java
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.analyzer;
+
+import org.apache.doris.catalog.InfoSchemaDb;
+import org.apache.doris.catalog.SchemaTable;
+import org.apache.doris.nereids.exceptions.UnboundException;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.BlockFuncDepsPropagation;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
+import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
+import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Unbound black hole sink.
+ * The blackhole sink is currently used in "warm up select" SQL statements to 
preload file block caches into tables.
+ * It is planned as a terminal sink (like /dev/null),
+ * meaning a "black hole" at the end of the execution plan that discards all 
incoming data.
+ */
+public class UnboundBlackholeSink<CHILD_TYPE extends Plan> extends 
UnboundLogicalSink<CHILD_TYPE>
+        implements Unbound, Sink, BlockFuncDepsPropagation {
+
+    /**
+     * UnboundBlackholeSink Context
+     */
+    public static class UnboundBlackholeSinkContext {
+        private boolean isForWarmUp = false;
+
+        public UnboundBlackholeSinkContext(boolean isForWarmUp) {
+            this.isForWarmUp = isForWarmUp;
+        }
+
+        public boolean isForWarmUp() {
+            return isForWarmUp;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            UnboundBlackholeSinkContext that = (UnboundBlackholeSinkContext) o;
+            return isForWarmUp == that.isForWarmUp;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(isForWarmUp);
+        }
+    }
+
+    private UnboundBlackholeSinkContext context;
+
+    /**
+     * create unbound sink for blackhole sink
+     */
+    public UnboundBlackholeSink(CHILD_TYPE child, UnboundBlackholeSinkContext 
context) {
+        super(ImmutableList.of(InfoSchemaDb.DATABASE_NAME, 
SchemaTable.BLACKHOLE_TABLE_NAME),
+                PlanType.LOGICAL_UNBOUND_BLACKHOLE_SINK,
+                ImmutableList.of(),
+                Optional.empty(),
+                Optional.empty(),
+                ImmutableList.of(),
+                DMLCommandType.INSERT,
+                child);
+        this.context = context;
+    }
+
+    /**
+     * create unbound sink for blackhole sink
+     */
+    public UnboundBlackholeSink(Optional<GroupExpression> groupExpression,
+            Optional<LogicalProperties> logicalProperties, CHILD_TYPE child, 
UnboundBlackholeSinkContext context) {
+        super(ImmutableList.of(InfoSchemaDb.DATABASE_NAME, 
SchemaTable.BLACKHOLE_TABLE_NAME),
+                PlanType.LOGICAL_UNBOUND_BLACKHOLE_SINK,
+                ImmutableList.of(),
+                groupExpression,
+                logicalProperties,
+                ImmutableList.of(),
+                DMLCommandType.INSERT,
+                child);
+        this.context = context;
+    }
+
+    public UnboundBlackholeSinkContext getContext() {
+        return context;
+    }
+
+    @Override
+    public Plan withChildren(List<Plan> children) {
+        Preconditions.checkArgument(children.size() == 1, 
"UnboundBlackholeSink only accepts one child");
+        return new UnboundBlackholeSink<>(groupExpression, Optional.empty(), 
children.get(0), context);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitUnboundBlackholeSink(this, context);
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+        return new UnboundBlackholeSink<>(groupExpression, 
Optional.of(getLogicalProperties()), child(), context);
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        Preconditions.checkArgument(children.size() == 1, 
"UnboundBlackholeSink only accepts one child");
+        return new UnboundBlackholeSink<>(groupExpression, logicalProperties, 
children.get(0), context);
+    }
+
+    @Override
+    public UnboundBlackholeSink<CHILD_TYPE> 
withOutputExprs(List<NamedExpression> outputExprs) {
+        throw new UnboundException("could not call withOutputExprs on 
UnboundBlackholeSink");
+    }
+
+    @Override
+    public List<Slot> computeOutput() {
+        throw new UnboundException("output");
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlString("UnboundBlackholeSink[" + id.asInt() + "]");
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index a6a2d6cc08c..8a8a342ccf0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -121,6 +121,7 @@ import 
org.apache.doris.nereids.trees.plans.algebra.Relation;
 import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
 import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalBlackholeSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
@@ -182,6 +183,7 @@ import org.apache.doris.planner.AggregationNode;
 import org.apache.doris.planner.AnalyticEvalNode;
 import org.apache.doris.planner.AssertNumRowsNode;
 import org.apache.doris.planner.BackendPartitionedSchemaScanNode;
+import org.apache.doris.planner.BlackholeSink;
 import org.apache.doris.planner.CTEScanNode;
 import org.apache.doris.planner.DataPartition;
 import org.apache.doris.planner.DataStreamSink;
@@ -443,6 +445,15 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
      * sink Node, in lexicographical order
      * 
********************************************************************************************
 */
 
+    @Override
+    public PlanFragment visitPhysicalBlackholeSink(PhysicalBlackholeSink<? 
extends Plan> physicalBlackholeSink,
+            PlanTranslatorContext context) {
+        PlanFragment planFragment = physicalBlackholeSink.child().accept(this, 
context);
+        BlackholeSink blackholeSink = new 
BlackholeSink(planFragment.getPlanRoot().getId());
+        planFragment.setSink(blackholeSink);
+        return planFragment;
+    }
+
     @Override
     public PlanFragment visitPhysicalResultSink(PhysicalResultSink<? extends 
Plan> physicalResultSink,
             PlanTranslatorContext context) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index c1e26372cd0..98caca0ce37 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -477,6 +477,8 @@ import 
org.apache.doris.nereids.DorisParser.WithRemoteStorageSystemContext;
 import org.apache.doris.nereids.DorisParserBaseVisitor;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundBlackholeSink;
+import 
org.apache.doris.nereids.analyzer.UnboundBlackholeSink.UnboundBlackholeSinkContext;
 import org.apache.doris.nereids.analyzer.UnboundFunction;
 import org.apache.doris.nereids.analyzer.UnboundInlineTable;
 import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
@@ -987,6 +989,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.info.WarmUpItem;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.WarmupSelectCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.load.CreateRoutineLoadCommand;
 import org.apache.doris.nereids.trees.plans.commands.load.LoadColumnClause;
 import org.apache.doris.nereids.trees.plans.commands.load.LoadColumnDesc;
@@ -8736,6 +8739,71 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
         return new WarmUpClusterCommand(warmUpItems, srcCluster, dstCluster, 
isForce, isWarmUpWithTable, properties);
     }
 
+    @Override
+    public LogicalPlan visitWarmUpSelect(DorisParser.WarmUpSelectContext ctx) {
+        LogicalPlan relation = 
visitWarmUpSingleTableRef(ctx.warmUpSingleTableRef());
+
+        LogicalPlan filter = withFilter(relation, 
Optional.ofNullable(ctx.whereClause()));
+
+        List<Expression> projectList = 
visitNamedExpressionSeq(ctx.namedExpressionSeq());
+
+        for (Expression expr : projectList) {
+            if (!isSimpleColumnReference(expr)) {
+                throw new AnalysisException("WARM UP SELECT only supports 
simple column references, "
+                        + "aggregate functions and complex expressions are not 
allowed");
+            }
+        }
+
+        LogicalProject project = new LogicalProject(projectList, filter);
+
+        if (Config.isNotCloudMode() && 
(!ConnectContext.get().getSessionVariable().isEnableFileCache())) {
+            throw new AnalysisException("WARM UP SELECT requires session 
variable"
+                    + " enable_file_cache=true");
+        }
+
+        if (Config.isCloudMode() && 
ConnectContext.get().getSessionVariable().isDisableFileCache()) {
+            throw new AnalysisException("WARM UP SELECT requires session 
variable"
+                    + " disable_file_cache=false in cloud mode");
+        }
+
+        UnboundBlackholeSink<?> sink = new UnboundBlackholeSink<>(project,
+                new UnboundBlackholeSinkContext(true));
+        LogicalPlan command = new WarmupSelectCommand(sink);
+        return withExplain(command, ctx.explain());
+    }
+
+    @Override
+    public LogicalPlan 
visitWarmUpSingleTableRef(DorisParser.WarmUpSingleTableRefContext ctx) {
+        List<String> nameParts = 
visitMultipartIdentifier(ctx.multipartIdentifier());
+
+        // Create a simple UnboundRelation for warm up queries
+        UnboundRelation relation = new UnboundRelation(
+                StatementScopeIdGenerator.newRelationId(),
+                nameParts);
+
+        LogicalPlan checkedRelation = 
LogicalPlanBuilderAssistant.withCheckPolicy(relation);
+        LogicalPlan plan = withTableAlias(checkedRelation, ctx.tableAlias());
+        return plan;
+    }
+
+    /**
+     * Check if an expression is a simple column reference (not aggregate 
functions or complex expressions)
+     */
+    private boolean isSimpleColumnReference(Expression expr) {
+        // Allow simple column references
+        if (expr instanceof Slot) {
+            return true;
+        }
+
+        // Allow star expressions (*)
+        if (expr instanceof UnboundStar) {
+            return true;
+        }
+
+        // Reject everything else (including function calls, arithmetic 
expressions, etc.)
+        return false;
+    }
+
     @Override
     public LogicalPlan visitCreateResource(DorisParser.CreateResourceContext 
ctx) {
         String resourceName = visitIdentifierOrText(ctx.name);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
index 05ed2760307..2322b39a62b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
@@ -38,6 +38,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.algebra.SetOperation;
 import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalBlackholeSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
 import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDictionarySink;
@@ -131,6 +132,13 @@ public class RequestPropertyDeriver extends 
PlanVisitor<Void, PlanContext> {
      * sink Node, in lexicographical order
      * 
********************************************************************************************
 */
 
+    @Override
+    public Void visitPhysicalBlackholeSink(PhysicalBlackholeSink<? extends 
Plan> sink, PlanContext context) {
+        // Blackhole sink need parallel instance
+        addRequestPropertyToChildren(PhysicalProperties.ANY);
+        return null;
+    }
+
     @Override
     public Void visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends 
Plan> olapTableSink, PlanContext context) {
         if (connectContext != null && 
!connectContext.getSessionVariable().enableStrictConsistencyDml) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
index f361348ccad..15d67d2e696 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
@@ -53,6 +53,7 @@ import 
org.apache.doris.nereids.rules.exploration.mv.MaterializedViewWindowScanR
 import 
org.apache.doris.nereids.rules.expression.ExpressionNormalizationAndOptimization;
 import org.apache.doris.nereids.rules.implementation.AggregateStrategies;
 import 
org.apache.doris.nereids.rules.implementation.LogicalAssertNumRowsToPhysicalAssertNumRows;
+import 
org.apache.doris.nereids.rules.implementation.LogicalBlackholeSinkToPhysicalBlackholeSink;
 import 
org.apache.doris.nereids.rules.implementation.LogicalCTEAnchorToPhysicalCTEAnchor;
 import 
org.apache.doris.nereids.rules.implementation.LogicalCTEConsumerToPhysicalCTEConsumer;
 import 
org.apache.doris.nereids.rules.implementation.LogicalCTEProducerToPhysicalCTEProducer;
@@ -223,6 +224,7 @@ public class RuleSet {
             .add(new LogicalResultSinkToPhysicalResultSink())
             .add(new 
LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink())
             .add(new LogicalDictionarySinkToPhysicalDictionarySink())
+            .add(new LogicalBlackholeSinkToPhysicalBlackholeSink())
             .build();
 
     // left-zig-zag tree is used when column stats are not available.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 6e275ec8263..87f9da44f3b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -35,6 +35,7 @@ public enum RuleType {
 
     // binding rules
     BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
+    BINDING_INSERT_BLACKHOLE_SINK(RuleTypeClass.REWRITE),
     BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE),
     BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE),
     BINDING_INSERT_JDBC_TABLE(RuleTypeClass.REWRITE),
@@ -518,6 +519,7 @@ public enum RuleType {
     LOGICAL_JDBC_SCAN_TO_PHYSICAL_JDBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_ODBC_SCAN_TO_PHYSICAL_ODBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
+    
LOGICAL_BLACKHOLE_SINK_TO_PHYSICAL_BLACKHOLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
index bb75f846d51..34462634f44 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.MaterializedIndexMeta;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.IdGenerator;
 import org.apache.doris.common.Pair;
 import org.apache.doris.datasource.hive.HMSExternalDatabase;
@@ -40,6 +41,7 @@ import org.apache.doris.dictionary.Dictionary;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.analyzer.Scope;
+import org.apache.doris.nereids.analyzer.UnboundBlackholeSink;
 import org.apache.doris.nereids.analyzer.UnboundDictionarySink;
 import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
 import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
@@ -68,11 +70,13 @@ import 
org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
 import 
org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalBlackholeSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalDictionarySink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
@@ -142,8 +146,9 @@ public class BindSink implements AnalysisRuleFactory {
                     
unboundIcebergTableSink().thenApply(this::bindIcebergTableSink)),
                 
RuleType.BINDING_INSERT_JDBC_TABLE.build(unboundJdbcTableSink().thenApply(this::bindJdbcTableSink)),
                 RuleType.BINDING_INSERT_DICTIONARY_TABLE
-                        
.build(unboundDictionarySink().thenApply(this::bindDictionarySink))
-        );
+                        
.build(unboundDictionarySink().thenApply(this::bindDictionarySink)),
+                
RuleType.BINDING_INSERT_BLACKHOLE_SINK.build(unboundBlackholeSink().thenApply(this::bindBlackHoleSink))
+                );
     }
 
     private Plan bindOlapTableSink(MatchingContext<UnboundTableSink<Plan>> 
ctx) {
@@ -508,6 +513,22 @@ public class BindSink implements AnalysisRuleFactory {
         return columnToOutput;
     }
 
+    private Plan bindBlackHoleSink(MatchingContext<UnboundBlackholeSink<Plan>> 
ctx) {
+        UnboundBlackholeSink<?> sink = ctx.root;
+        LogicalPlan child = ((LogicalPlan) sink.child());
+        if (sink.getContext().isForWarmUp() && Config.isNotCloudMode() && 
child.containsType(LogicalOlapScan.class)) {
+            throw new AnalysisException("WARM UP SELECT doesn't support olap 
table in non-cloud mode.");
+        }
+        LogicalBlackholeSink<?> boundSink = new LogicalBlackholeSink<>(
+                child.getOutput().stream()
+                        .map(NamedExpression.class::cast)
+                        .collect(ImmutableList.toImmutableList()),
+                Optional.empty(),
+                Optional.empty(),
+                child);
+        return boundSink;
+    }
+
     private Plan bindHiveTableSink(MatchingContext<UnboundHiveTableSink<Plan>> 
ctx) {
         UnboundHiveTableSink<?> sink = ctx.root;
         Pair<HMSExternalDatabase, HMSExternalTable> pair = 
bind(ctx.cascadesContext, sink);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalBlackholeSinkToPhysicalBlackholeSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalBlackholeSinkToPhysicalBlackholeSink.java
new file mode 100644
index 00000000000..389e914a696
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalBlackholeSinkToPhysicalBlackholeSink.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalBlackholeSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalBlackholeSink;
+
+import java.util.Optional;
+
+/**
+ * implement blackhole sink.
+ */
+public class LogicalBlackholeSinkToPhysicalBlackholeSink extends 
OneImplementationRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalBlackholeSink().thenApply(ctx -> {
+            LogicalBlackholeSink<? extends Plan> sink = ctx.root;
+            return new PhysicalBlackholeSink<>(
+                    sink.getOutputExprs(),
+                    Optional.empty(),
+                    sink.getLogicalProperties(),
+                    sink.child());
+        
}).toRule(RuleType.LOGICAL_BLACKHOLE_SINK_TO_PHYSICAL_BLACKHOLE_SINK_RULE);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index f2e88c1df7d..181b7e4083a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -52,6 +52,7 @@ public enum PlanType {
     LOGICAL_ICEBERG_TABLE_SINK,
     LOGICAL_JDBC_TABLE_SINK,
     LOGICAL_RESULT_SINK,
+    LOGICAL_BLACKHOLE_SINK,
     LOGICAL_DICTIONARY_SINK,
     LOGICAL_UNBOUND_OLAP_TABLE_SINK,
     LOGICAL_UNBOUND_HIVE_TABLE_SINK,
@@ -59,6 +60,7 @@ public enum PlanType {
     LOGICAL_UNBOUND_JDBC_TABLE_SINK,
     LOGICAL_UNBOUND_RESULT_SINK,
     LOGICAL_UNBOUND_DICTIONARY_SINK,
+    LOGICAL_UNBOUND_BLACKHOLE_SINK,
 
     // logical others
     LOGICAL_AGGREGATE,
@@ -113,6 +115,7 @@ public enum PlanType {
     PHYSICAL_ICEBERG_TABLE_SINK,
     PHYSICAL_JDBC_TABLE_SINK,
     PHYSICAL_RESULT_SINK,
+    PHYSICAL_BLACKHOLE_SINK,
     PHYSICAL_DICTIONARY_SINK,
 
     // physical others
@@ -153,6 +156,8 @@ public enum PlanType {
     EXPORT_COMMAND,
     INSERT_INTO_TABLE_COMMAND,
     INSERT_INTO_DICTIONARY_COMMAND,
+    INSERT_INTO_BLACKHOLE_COMMAND,
+    WARMUP_SELECT_COMMAND,
     BATCH_INSERT_INTO_TABLE_COMMAND,
     INSERT_OVERWRITE_TABLE_COMMAND,
     LOAD_COMMAND,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index f69a868182d..f1afbd135ec 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -44,7 +44,9 @@ import org.apache.doris.thrift.TStatusCode;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
  * Abstract insert executor.
@@ -70,6 +72,25 @@ public abstract class AbstractInsertExecutor {
     protected final boolean emptyInsert;
     protected long txnId = INVALID_TXN_ID;
 
+    /**
+     * Insert executor listener
+     */
+    public interface InsertExecutorListener {
+        /**
+         * Called before insert execution begins
+         */
+
+        default void beforeComplete(AbstractInsertExecutor insertExecutor, 
StmtExecutor executor, long jobId)
+                throws Exception {
+        }
+
+        default void afterComplete(AbstractInsertExecutor insertExecutor, 
StmtExecutor executor, long jobId)
+                throws Exception {
+        }
+    }
+
+    private List<InsertExecutorListener> listeners = new 
CopyOnWriteArrayList<>();
+
     /**
      * constructor
      */
@@ -91,6 +112,14 @@ public abstract class AbstractInsertExecutor {
         this.jobId = jobId;
     }
 
+    public void registerListener(InsertExecutorListener listener) {
+        listeners.add(listener);
+    }
+
+    public void unregisterListener(InsertExecutorListener listener) {
+        listeners.remove(listener);
+    }
+
     public Coordinator getCoordinator() {
         return coordinator;
     }
@@ -208,7 +237,13 @@ public abstract class AbstractInsertExecutor {
             executor.updateProfile(false);
             execImpl(executor);
             checkStrictModeAndFilterRatio();
+            for (InsertExecutorListener listener : listeners) {
+                listener.beforeComplete(this, executor, jobId);
+            }
             onComplete();
+            for (InsertExecutorListener listener : listeners) {
+                listener.afterComplete(this, executor, jobId);
+            }
         } catch (Throwable t) {
             onFail(t);
             // retry insert into from select when meet E-230 in cloud
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BlackholeInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BlackholeInsertExecutor.java
new file mode 100644
index 00000000000..b2ef9c19efa
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BlackholeInsertExecutor.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
+import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.StmtExecutor;
+
+import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * Insert executor for blackhole table
+ */
+public class BlackholeInsertExecutor extends AbstractInsertExecutor {
+    private static final Logger LOG = 
LogManager.getLogger(BlackholeInsertExecutor.class);
+
+    /**
+     * constructor
+     */
+    public BlackholeInsertExecutor(ConnectContext ctx, TableIf table, String 
labelName, NereidsPlanner planner,
+            Optional<InsertCommandContext> insertCtx, boolean emptyInsert, 
long jobId) {
+        super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
+    }
+
+    @Override
+    public void beginTransaction() {
+    }
+
+    @Override
+    protected void beforeExec() throws UserException {
+        // do nothing
+    }
+
+    @Override
+    protected void finalizeSink(PlanFragment fragment, DataSink sink, 
PhysicalSink physicalSink) {
+        // do nothing
+    }
+
+    @Override
+    protected void onComplete() {
+        if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) {
+            LOG.warn("blackhole insert failed. label: {}, error: {}",
+                    labelName, coordinator.getExecStatus().getErrorMsg());
+        }
+    }
+
+    @Override
+    protected void onFail(Throwable t) {
+        errMsg = t.getMessage() == null ? "unknown reason" : 
Util.getRootCauseMessage(t);
+        String queryId = DebugUtil.printId(ctx.queryId());
+        LOG.warn("insert [{}] with query id {} abort txn {} failed", 
labelName, queryId, txnId);
+        StringBuilder sb = new StringBuilder(errMsg);
+        if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
+            sb.append(". url: ").append(coordinator.getTrackingUrl());
+        }
+        ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString());
+    }
+
+    @Override
+    protected void afterExec(StmtExecutor executor) {
+
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index f2d9d28153f..3ac25f371ae 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -55,8 +55,10 @@ import org.apache.doris.nereids.trees.plans.commands.Command;
 import 
org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
 import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
 import org.apache.doris.nereids.trees.plans.commands.NeedAuditEncryption;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.AbstractInsertExecutor.InsertExecutorListener;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalBlackholeSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDictionarySink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
@@ -117,18 +119,20 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
     private final Optional<LogicalPlan> cte;
     private final boolean needNormalizePlan;
 
+    private InsertExecutorListener insertExecutorListener;
+
     public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> 
labelName,
             Optional<InsertCommandContext> insertCtx, Optional<LogicalPlan> 
cte) {
-        this(logicalQuery, labelName, insertCtx, cte, true, Optional.empty());
+        this(PlanType.INSERT_INTO_TABLE_COMMAND, logicalQuery, labelName, 
insertCtx, cte, true, Optional.empty());
     }
 
     /**
      * constructor
      */
-    public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> 
labelName,
+    public InsertIntoTableCommand(PlanType planType, LogicalPlan logicalQuery, 
Optional<String> labelName,
                                   Optional<InsertCommandContext> insertCtx, 
Optional<LogicalPlan> cte,
                                   boolean needNormalizePlan, Optional<String> 
branchName) {
-        super(PlanType.INSERT_INTO_TABLE_COMMAND);
+        super(planType);
         this.originLogicalQuery = Objects.requireNonNull(logicalQuery, 
"logicalQuery should not be null");
         this.labelName = Objects.requireNonNull(labelName, "labelName should 
not be null");
         this.logicalQuery = Optional.empty();
@@ -143,6 +147,13 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
         }
     }
 
+    public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> 
labelName,
+            Optional<InsertCommandContext> insertCtx, Optional<LogicalPlan> 
cte,
+            boolean needNormalizePlan, Optional<String> branchName) {
+        this(PlanType.INSERT_INTO_TABLE_COMMAND, logicalQuery, labelName, 
insertCtx, cte,
+                needNormalizePlan, branchName);
+    }
+
     /**
      * constructor for derived class
      */
@@ -190,6 +201,10 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
         this.jobId = jobId;
     }
 
+    public void setInsertExecutorListener(InsertExecutorListener 
insertExecutorListener) {
+        this.insertExecutorListener = insertExecutorListener;
+    }
+
     @Override
     public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
         runInternal(ctx, executor);
@@ -229,7 +244,7 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
         while (++retryTimes < 
Math.max(ctx.getSessionVariable().dmlPlanRetryTimes, 3)) {
             TableIf targetTableIf = getTargetTableIf(ctx, 
qualifiedTargetTableName);
             // check auth
-            if (!Env.getCurrentEnv().getAccessManager()
+            if (needAuthCheck(targetTableIf) && 
!Env.getCurrentEnv().getAccessManager()
                     .checkTblPriv(ConnectContext.get(), 
targetTableIf.getDatabase().getCatalog().getName(),
                             targetTableIf.getDatabase().getFullName(), 
targetTableIf.getName(),
                             PrivPredicate.LOAD)) {
@@ -299,6 +314,16 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
         throw new AnalysisException("Insert plan failed. Could not get target 
table lock.");
     }
 
+    /**
+     * Hook method to determine if auth check is needed.
+     * Subclasses can override this to skip auth check, e.g., 
WarmupSelectCommand.
+     * @param targetTableIf the target table
+     * @return true if auth check is needed, false otherwise
+     */
+    protected boolean needAuthCheck(TableIf targetTableIf) {
+        return true;
+    }
+
     private BuildInsertExecutorResult initPlanOnce(ConnectContext ctx,
             StmtExecutor stmtExecutor, TableIf targetTableIf) throws Throwable 
{
         targetTableIf.readLock();
@@ -462,6 +487,12 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
                 return ExecutorFactory.from(planner, dataSink, physicalSink,
                         () -> new DictionaryInsertExecutor(
                                 ctx, dictionary, label, planner, insertCtx, 
emptyInsert, jobId));
+            } else if (physicalSink instanceof PhysicalBlackholeSink) {
+                boolean emptyInsert = childIsEmptyRelation(physicalSink);
+                // insertCtx is not useful for blackhole. so keep it empty is 
ok.
+                return ExecutorFactory.from(planner, dataSink, physicalSink,
+                        () -> new BlackholeInsertExecutor(ctx, targetTableIf, 
label, planner, insertCtx, emptyInsert,
+                                jobId));
             } else {
                 // TODO: support other table types
                 throw new AnalysisException(
@@ -540,6 +571,9 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
         if (insertExecutor.isEmptyInsert()) {
             return;
         }
+        if (insertExecutorListener != null) {
+            insertExecutor.registerListener(insertExecutorListener);
+        }
         insertExecutor.executeSingleInsert(executor);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index 077e57daaa9..1cfef22a10e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -33,6 +33,7 @@ import org.apache.doris.datasource.jdbc.JdbcExternalTable;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.analyzer.Scope;
 import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundBlackholeSink;
 import org.apache.doris.nereids.analyzer.UnboundDictionarySink;
 import org.apache.doris.nereids.analyzer.UnboundFunction;
 import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
@@ -578,6 +579,8 @@ public class InsertUtils {
             unboundTableSink = (UnboundJdbcTableSink<? extends Plan>) plan;
         } else if (plan instanceof UnboundDictionarySink) {
             unboundTableSink = (UnboundDictionarySink<? extends Plan>) plan;
+        } else if (plan instanceof UnboundBlackholeSink) {
+            unboundTableSink = (UnboundBlackholeSink<? extends Plan>) plan;
         } else {
             throw new AnalysisException(
                     "the root of plan only accept Olap, Dictionary, Hive, 
Iceberg or Jdbc table sink, but it is "
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/WarmupSelectCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/WarmupSelectCommand.java
new file mode 100644
index 00000000000..e997f362b0e
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/WarmupSelectCommand.java
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.AbstractInsertExecutor.InsertExecutorListener;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.qe.CoordInterface;
+import org.apache.doris.qe.QeProcessorImpl;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.ShowResultSet;
+import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TQueryStatistics;
+import org.apache.doris.thrift.TQueryStatisticsResult;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * File cache warm up select command, such as "warm up select * from table 
where ..."
+ */
+public class WarmupSelectCommand extends InsertIntoTableCommand {
+
+    private static final long QUERY_STATISTICS_TIMEOUT_MS = 5000; // 5 seconds
+    private static final long QUERY_STATISTICS_INTERVAL_MS = 1000; // 1 second
+
+    // Create a result set with aggregated data per BE
+    private static final ShowResultSetMetaData metaData = 
ShowResultSetMetaData.builder()
+            .addColumn(new Column("BackendId", ScalarType.createVarchar(20)))
+            .addColumn(new Column("ScanRows", 
ScalarType.createType(PrimitiveType.BIGINT)))
+            .addColumn(new Column("ScanBytes", 
ScalarType.createType(PrimitiveType.BIGINT)))
+            .addColumn(new Column("ScanBytesFromLocalStorage", 
ScalarType.createType(PrimitiveType.BIGINT)))
+            .addColumn(new Column("ScanBytesFromRemoteStorage", 
ScalarType.createType(PrimitiveType.BIGINT)))
+            .addColumn(new Column("BytesWriteIntoCache", 
ScalarType.createType(PrimitiveType.BIGINT)))
+            .build();
+
+    /**
+     * WarmupSelectCommand constructor
+     */
+    public WarmupSelectCommand(LogicalPlan logicalQuery) {
+        super(PlanType.INSERT_INTO_BLACKHOLE_COMMAND, logicalQuery, 
Optional.empty(), Optional.empty(),
+                Optional.empty(), true, Optional.empty());
+        super.setInsertExecutorListener(new InsertExecutorListener() {
+            @Override
+            public void beforeComplete(AbstractInsertExecutor insertExecutor, 
StmtExecutor executor, long jobId)
+                    throws Exception {
+                // no-ops
+            }
+
+            @Override
+            public void afterComplete(AbstractInsertExecutor insertExecutor, 
StmtExecutor executor, long jobId)
+                    throws Exception {
+                if (insertExecutor.ctx.getState().getStateType() == 
QueryState.MysqlStateType.ERR) {
+                    return;
+                }
+                Map<Long, TQueryStatisticsResult> statistics = 
pollForQueryStatistics(insertExecutor.ctx.queryId(),
+                        QUERY_STATISTICS_TIMEOUT_MS, 
QUERY_STATISTICS_INTERVAL_MS);
+                if (statistics != null) {
+                    sendAggregatedBlackholeResults(statistics, executor);
+                }
+            }
+        });
+    }
+
+    /**
+     * Poll for query statistics until the query is finished or timeout is 
reached.
+     * Wait for the latest stats from each BE until all BEs have reported 
isFinished or overall timeout.
+     *
+     * @param queryId The query ID to poll for
+     * @param timeoutMs Maximum time to wait in milliseconds
+     * @param intervalMs Time between polls in milliseconds
+     * @return Map of backend ID to query statistics result
+     */
+    private Map<Long, TQueryStatisticsResult> pollForQueryStatistics(TUniqueId 
queryId,
+                                                                     long 
timeoutMs,
+                                                                     long 
intervalMs) {
+        long startTime = System.currentTimeMillis();
+        Map<Long, TQueryStatisticsResult> beIdToStatisticsResult;
+
+        // Get the coordinator to know how many backends are involved
+        CoordInterface coor = QeProcessorImpl.INSTANCE.getCoordinator(queryId);
+        int expectedBackendCount = 0;
+        if (coor == null) {
+            return null;
+        }
+
+        expectedBackendCount = coor.getInvolvedBackends().size();
+
+        // Track which backends have finished and store the latest statistics 
for each backend
+        Set<Long> finishedBackends = Sets.newHashSet();
+        Map<Long, TQueryStatisticsResult> latestStatisticsResult = 
Maps.newHashMap();
+
+        // Continue polling until all backends are finished or timeout
+        while (System.currentTimeMillis() - startTime < timeoutMs && 
finishedBackends.size() < expectedBackendCount) {
+            // Get current statistics
+            beIdToStatisticsResult = 
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr()
+                    .getQueryStatistics(DebugUtil.printId(queryId));
+
+            // Update the latest statistics for each backend and check which 
ones have finished
+            if (beIdToStatisticsResult != null && 
!beIdToStatisticsResult.isEmpty()) {
+                for (Map.Entry<Long, TQueryStatisticsResult> entry : 
beIdToStatisticsResult.entrySet()) {
+                    Long beId = entry.getKey();
+                    TQueryStatisticsResult result = entry.getValue();
+
+                    // Always update the latest statistics for this backend
+                    latestStatisticsResult.put(beId, result);
+
+                    // If this backend has finished, mark it as finished
+                    if (result.isSetQueryFinished() && 
result.isQueryFinished()) {
+                        finishedBackends.add(beId);
+                    }
+                }
+            }
+
+            // If not all backends have finished, continue polling
+            if (finishedBackends.size() < expectedBackendCount) {
+                // Sleep before next poll
+                try {
+                    Thread.sleep(intervalMs);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    break;
+                }
+            }
+        }
+
+        return latestStatisticsResult;
+    }
+
+    /**
+     * Send aggregated blackhole results to the client.
+     * Aggregates statistics from all backends and sends a summary result set.
+     *
+     * @param statisticsResult Map of backend ID to query statistics result
+     * @param executor The statement executor to send results through
+     * @throws IOException If sending the result set fails
+     */
+    public void sendAggregatedBlackholeResults(Map<Long, 
TQueryStatisticsResult> statisticsResult,
+            StmtExecutor executor) throws IOException {
+        List<List<String>> rows = Lists.newArrayList();
+        long totalScanRows = 0;
+        long totalScanBytes = 0;
+        long totalScanBytesFromLocalStorage = 0;
+        long totalScanBytesFromRemoteStorage = 0;
+        long totalBytesWriteIntoCache = 0;
+
+        // Add a row for each BE with its aggregated data
+        for (Map.Entry<Long, TQueryStatisticsResult> entry : 
statisticsResult.entrySet()) {
+            Long beId = entry.getKey();
+            TQueryStatisticsResult data = entry.getValue();
+            if (!data.isSetStatistics()) {
+                continue;
+            }
+            TQueryStatistics statistics = data.getStatistics();
+            List<String> row = Lists.newArrayList(
+                    beId.toString(),
+                    String.valueOf(statistics.getScanRows()),
+                    String.valueOf(statistics.getScanBytes()),
+                    String.valueOf(statistics.getScanBytesFromLocalStorage()),
+                    String.valueOf(statistics.getScanBytesFromRemoteStorage()),
+                    String.valueOf(statistics.getBytesWriteIntoCache())
+            );
+
+            rows.add(row);
+
+            // Accumulate totals
+            totalScanRows += statistics.getScanRows();
+            totalScanBytes += statistics.getScanBytes();
+            totalScanBytesFromLocalStorage += 
statistics.getScanBytesFromLocalStorage();
+            totalScanBytesFromRemoteStorage += 
statistics.getScanBytesFromRemoteStorage();
+            totalBytesWriteIntoCache += statistics.getBytesWriteIntoCache();
+        }
+
+        // Add a total row
+        List<String> totalRow = Lists.newArrayList(
+                "TOTAL",
+                String.valueOf(totalScanRows),
+                String.valueOf(totalScanBytes),
+                String.valueOf(totalScanBytesFromLocalStorage),
+                String.valueOf(totalScanBytesFromRemoteStorage),
+                String.valueOf(totalBytesWriteIntoCache)
+        );
+        rows.add(totalRow);
+
+        ShowResultSet resultSet = new ShowResultSet(metaData, rows);
+        executor.sendResultSet(resultSet);
+    }
+
+    /**
+     * Skip auth check for warmup select command.
+     * The blackhole table is a system table and doesn't require LOAD 
privilege.
+     * Nereids analyzer will check SELECT privilege on source tables.
+     */
+    @Override
+    protected boolean needAuthCheck(TableIf targetTableIf) {
+        return false;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalBlackholeSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalBlackholeSink.java
new file mode 100644
index 00000000000..97b6c220b86
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalBlackholeSink.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.PropagateFuncDeps;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * logical blackhole sink
+ * The blackhole sink is currently used in "warm up select" SQL statements to 
preload file block caches into tables.
+ * It is planned as a terminal sink (like /dev/null),
+ * meaning a "black hole" at the end of the execution plan that discards all 
incoming data.
+ */
+public class LogicalBlackholeSink<CHILD_TYPE extends Plan> extends 
LogicalSink<CHILD_TYPE>
+        implements Sink, PropagateFuncDeps {
+
+    public LogicalBlackholeSink(List<NamedExpression> outputExprs, CHILD_TYPE 
child) {
+        super(PlanType.LOGICAL_BLACKHOLE_SINK, outputExprs, child);
+    }
+
+    public LogicalBlackholeSink(List<NamedExpression> outputExprs,
+            Optional<GroupExpression> groupExpression,
+            Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
+        super(PlanType.LOGICAL_BLACKHOLE_SINK, outputExprs, groupExpression, 
logicalProperties, child);
+    }
+
+    @Override
+    public LogicalBlackholeSink<Plan> withChildren(List<Plan> children) {
+        Preconditions.checkArgument(children.size() == 1,
+                "LogicalBlackholeSink's children size must be 1, but real is 
%s", children.size());
+        return new LogicalBlackholeSink<>(outputExprs, children.get(0));
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitLogicalBlackholeSink(this, context);
+    }
+
+    @Override
+    public LogicalBlackholeSink<Plan> 
withGroupExpression(Optional<GroupExpression> groupExpression) {
+        return new LogicalBlackholeSink<>(outputExprs, groupExpression, 
Optional.of(getLogicalProperties()), child());
+    }
+
+    @Override
+    public LogicalBlackholeSink<Plan> 
withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        Preconditions.checkArgument(children.size() == 1, 
"LogicalBlackholeSink only accepts one child");
+        return new LogicalBlackholeSink<>(outputExprs, groupExpression, 
logicalProperties, children.get(0));
+    }
+
+    @Override
+    public LogicalBlackholeSink<CHILD_TYPE> 
withOutputExprs(List<NamedExpression> outputExprs) {
+        return new LogicalBlackholeSink<>(outputExprs, child());
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlString("LogicalBlackholeSink[" + id.asInt() + "]",
+                "outputExprs", outputExprs);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBlackholeSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBlackholeSink.java
new file mode 100644
index 00000000000..53666ccab14
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBlackholeSink.java
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.SqlCacheContext;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.ComputeResultSet;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.qe.ResultSet;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.base.Preconditions;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Physical blackhole sink.
+ * The blackhole sink is currently used in "warm up select" SQL statements to 
preload file block caches into tables.
+ * It is planned as a terminal sink (like /dev/null),
+ * meaning a "black hole" at the end of the execution plan that discards all 
incoming data.
+ */
+public class PhysicalBlackholeSink<CHILD_TYPE extends Plan> extends 
PhysicalSink<CHILD_TYPE>
+        implements Sink, ComputeResultSet {
+
+    public PhysicalBlackholeSink(List<NamedExpression> outputExprs, 
Optional<GroupExpression> groupExpression,
+            LogicalProperties logicalProperties, CHILD_TYPE child) {
+        this(outputExprs, groupExpression, logicalProperties, 
PhysicalProperties.GATHER, null, child);
+    }
+
+    public PhysicalBlackholeSink(List<NamedExpression> outputExprs, 
Optional<GroupExpression> groupExpression,
+            LogicalProperties logicalProperties, @Nullable PhysicalProperties 
physicalProperties,
+            Statistics statistics, CHILD_TYPE child) {
+        super(PlanType.PHYSICAL_BLACKHOLE_SINK, outputExprs, groupExpression,
+                logicalProperties, physicalProperties, statistics, child);
+    }
+
+    @Override
+    public PhysicalBlackholeSink<Plan> withChildren(List<Plan> children) {
+        Preconditions.checkArgument(children.size() == 1,
+                "PhysicalBlackholeSink's children size must be 1, but real is 
%s", children.size());
+        return new PhysicalBlackholeSink<>(outputExprs, groupExpression, 
getLogicalProperties(),
+                physicalProperties, statistics, children.get(0));
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitPhysicalBlackholeSink(this, context);
+    }
+
+    @Override
+    public List<? extends Expression> getExpressions() {
+        return outputExprs;
+    }
+
+    @Override
+    public PhysicalBlackholeSink<Plan> 
withGroupExpression(Optional<GroupExpression> groupExpression) {
+        return new PhysicalBlackholeSink<>(outputExprs, groupExpression, 
getLogicalProperties(),
+                physicalProperties, statistics, child());
+    }
+
+    @Override
+    public PhysicalBlackholeSink<Plan> 
withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        Preconditions.checkArgument(children.size() == 1,
+                "PhysicalBlackholeSink's children size must be 1, but real is 
%s", children.size());
+        return new PhysicalBlackholeSink<>(outputExprs, groupExpression, 
logicalProperties.get(),
+                physicalProperties, statistics, children.get(0));
+    }
+
+    @Override
+    public PhysicalBlackholeSink<Plan> withPhysicalPropertiesAndStats(
+            PhysicalProperties physicalProperties, Statistics statistics) {
+        return new PhysicalBlackholeSink<>(outputExprs, groupExpression,
+                getLogicalProperties(), physicalProperties, statistics, 
child());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PhysicalBlackholeSink<?> that = (PhysicalBlackholeSink<?>) o;
+        return outputExprs.equals(that.outputExprs);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(outputExprs);
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlString("PhysicalBlackholeSink[" + id.asInt() + "]",
+                "outputExprs", outputExprs);
+    }
+
+    @Override
+    public PhysicalBlackholeSink<CHILD_TYPE> resetLogicalProperties() {
+        return new PhysicalBlackholeSink<>(outputExprs, groupExpression,
+                null, physicalProperties, statistics, child());
+    }
+
+    @Override
+    public Optional<ResultSet> computeResultInFe(
+            CascadesContext cascadesContext, Optional<SqlCacheContext> 
sqlCacheContext, List<Slot> outputSlots) {
+        CHILD_TYPE child = child();
+        if (child instanceof ComputeResultSet) {
+            return ((ComputeResultSet) 
child).computeResultInFe(cascadesContext, sqlCacheContext, outputSlots);
+        } else {
+            return Optional.empty();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
index 06257229d4b..411398ad2e0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.nereids.trees.plans.visitor;
 
+import org.apache.doris.nereids.analyzer.UnboundBlackholeSink;
 import org.apache.doris.nereids.analyzer.UnboundDictionarySink;
 import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
 import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
@@ -24,6 +25,7 @@ import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink;
 import org.apache.doris.nereids.analyzer.UnboundResultSink;
 import org.apache.doris.nereids.analyzer.UnboundTableSink;
 import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalBlackholeSink;
 import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalDictionarySink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
@@ -34,6 +36,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalBlackholeSink;
 import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDictionarySink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
@@ -86,6 +89,10 @@ public interface SinkVisitor<R, C> {
         return visitLogicalSink(unboundDictionarySink, context);
     }
 
+    default R visitUnboundBlackholeSink(UnboundBlackholeSink<? extends Plan> 
unboundBlackholeSink, C context) {
+        return visitLogicalSink(unboundBlackholeSink, context);
+    }
+
     // *******************************
     // logical
     // *******************************
@@ -127,10 +134,20 @@ public interface SinkVisitor<R, C> {
         return visitLogicalSink(logicalDeferMaterializeResultSink, context);
     }
 
+    default R visitLogicalBlackholeSink(
+            LogicalBlackholeSink<? extends Plan> logicalBlackholeSink, C 
context) {
+        return visitLogicalSink(logicalBlackholeSink, context);
+    }
+
     // *******************************
     // physical
     // *******************************
 
+    default R visitPhysicalBlackholeSink(
+            PhysicalBlackholeSink<? extends Plan> sink, C context) {
+        return visitPhysicalSink(sink, context);
+    }
+
     default R visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink, 
C context) {
         return visitPhysicalSink(fileSink, context);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/BlackholeSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/BlackholeSink.java
new file mode 100644
index 00000000000..9a3994a036d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BlackholeSink.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.thrift.TBlackholeSink;
+import org.apache.doris.thrift.TDataSink;
+import org.apache.doris.thrift.TDataSinkType;
+import org.apache.doris.thrift.TExplainLevel;
+
+/**
+ * BlackholeSink is currently used in "warm up select" SQL statements to 
preload file block caches into tables.
+ * It is planned as a terminal sink (like /dev/null),
+ * meaning a "black hole" at the end of the execution plan that discards all 
incoming data.
+ */
+public class BlackholeSink extends DataSink {
+    private final PlanNodeId exchNodeId;
+
+    public BlackholeSink(PlanNodeId exchNodeId) {
+        this.exchNodeId = exchNodeId;
+    }
+
+    @Override
+    public String getExplainString(String prefix, TExplainLevel explainLevel) {
+        StringBuilder strBuilder = new StringBuilder();
+        strBuilder.append(prefix).append("BLACKHOLE SINK\n");
+        return strBuilder.toString();
+    }
+
+    @Override
+    protected TDataSink toThrift() {
+        TDataSink result = new TDataSink(TDataSinkType.BLACKHOLE_SINK);
+        TBlackholeSink tBlackholeSink = new TBlackholeSink();
+        result.setBlackholeSink(tBlackholeSink);
+        return result;
+    }
+
+    public PlanNodeId getExchNodeId() {
+        return exchNodeId;
+    }
+
+    @Override
+    public DataPartition getOutputPartition() {
+        return null;
+    }
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index fafb736c15e..cc31e7858e0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.plugin.AuditEvent;
 import org.apache.doris.thrift.TQueryStatistics;
+import org.apache.doris.thrift.TQueryStatisticsResult;
 import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams;
 
 import com.google.common.collect.Lists;
@@ -59,7 +60,7 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
             this.beLastReportTime = beLastReportTime;
         }
 
-        Map<String, Pair<Long, TQueryStatistics>> queryStatsMap = 
Maps.newConcurrentMap();
+        Map<String, Pair<Long, TQueryStatisticsResult>> queryStatsMap = 
Maps.newConcurrentMap();
     }
 
     public WorkloadRuntimeStatusMgr() {
@@ -164,7 +165,7 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
             LOG.warn("be report workload runtime status but without beid");
             return;
         }
-        if (!params.isSetQueryStatisticsMap()) {
+        if (!params.isSetQueryStatisticsResultMap()) {
             LOG.warn("be report workload runtime status but without query 
stats map");
             return;
         }
@@ -180,8 +181,8 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
         } else {
             beReportInfo.beLastReportTime = currentTime;
         }
-        for (Map.Entry<String, TQueryStatistics> entry : 
params.query_statistics_map.entrySet()) {
-            beReportInfo.queryStatsMap.put(entry.getKey(), 
Pair.of(currentTime, (TQueryStatistics) entry.getValue()));
+        for (Map.Entry<String, TQueryStatisticsResult> entry : 
params.query_statistics_result_map.entrySet()) {
+            beReportInfo.queryStatsMap.put(entry.getKey(), 
Pair.of(currentTime, entry.getValue()));
         }
     }
 
@@ -197,7 +198,7 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
             }
             Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
             for (String queryId : queryIdSet) {
-                Pair<Long, TQueryStatistics> pair = 
beReportInfo.queryStatsMap.get(queryId);
+                Pair<Long, TQueryStatisticsResult> pair = 
beReportInfo.queryStatsMap.get(queryId);
                 long queryLastReportTime = pair.first;
                 if (currentTime - queryLastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
                     beReportInfo.queryStatsMap.remove(queryId);
@@ -216,7 +217,7 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
             BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
             Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
             for (String queryId : queryIdSet) {
-                TQueryStatistics curQueryStats = 
beReportInfo.queryStatsMap.get(queryId).second;
+                TQueryStatisticsResult curQueryStats = 
beReportInfo.queryStatsMap.get(queryId).second;
 
                 TQueryStatistics retQuery = resultQueryMap.get(queryId);
                 if (retQuery == null) {
@@ -230,19 +231,35 @@ public class WorkloadRuntimeStatusMgr extends 
MasterDaemon {
         return resultQueryMap;
     }
 
-    private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics 
src) {
-        dst.scan_rows += src.scan_rows;
-        dst.scan_bytes += src.scan_bytes;
-        dst.scan_bytes_from_local_storage += src.scan_bytes_from_local_storage;
-        dst.scan_bytes_from_remote_storage += 
src.scan_bytes_from_remote_storage;
-        dst.cpu_ms += src.cpu_ms;
-        dst.shuffle_send_bytes += src.shuffle_send_bytes;
-        dst.shuffle_send_rows += src.shuffle_send_rows;
-        if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) {
-            dst.max_peak_memory_bytes = src.max_peak_memory_bytes;
+    public Map<Long, TQueryStatisticsResult> getQueryStatistics(String 
queryId) {
+        Map<Long, TQueryStatisticsResult> result = Maps.newHashMap();
+        for (Map.Entry<Long, BeReportInfo> entry : 
beToQueryStatsMap.entrySet()) {
+            Pair<Long, TQueryStatisticsResult> pair = 
entry.getValue().queryStatsMap.get(queryId);
+            if (pair != null) {
+                result.put(entry.getKey(), pair.second);
+            }
+        }
+        return result;
+    }
+
+
+    private void mergeQueryStatistics(TQueryStatistics dst, 
TQueryStatisticsResult src) {
+        TQueryStatistics srcStats = src.getStatistics();
+        if (srcStats == null) {
+            return;
         }
-        dst.spill_write_bytes_to_local_storage += 
src.spill_write_bytes_to_local_storage;
-        dst.spill_read_bytes_from_local_storage += 
src.spill_read_bytes_from_local_storage;
+        dst.scan_rows += srcStats.scan_rows;
+        dst.scan_bytes += srcStats.scan_bytes;
+        dst.scan_bytes_from_local_storage += 
srcStats.scan_bytes_from_local_storage;
+        dst.scan_bytes_from_remote_storage += 
srcStats.scan_bytes_from_remote_storage;
+        dst.cpu_ms += srcStats.cpu_ms;
+        dst.shuffle_send_bytes += srcStats.shuffle_send_bytes;
+        dst.shuffle_send_rows += srcStats.shuffle_send_rows;
+        if (dst.max_peak_memory_bytes < srcStats.max_peak_memory_bytes) {
+            dst.max_peak_memory_bytes = srcStats.max_peak_memory_bytes;
+        }
+        dst.spill_write_bytes_to_local_storage += 
srcStats.spill_write_bytes_to_local_storage;
+        dst.spill_read_bytes_from_local_storage += 
srcStats.spill_read_bytes_from_local_storage;
     }
 
     private void queryAuditEventLogWriteLock() {
@@ -252,4 +269,5 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
     private void queryAuditEventLogWriteUnlock() {
         queryAuditEventLock.unlock();
     }
+
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java
index cf373be7507..7ff1273fcca 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java
@@ -1461,4 +1461,31 @@ public class NereidsParserTest extends ParserTestBase {
                 + "WHEN NOT MATCHED THEN INSERT VALUES (c1, c2, c3)";
         Assertions.assertThrows(ParseException.class, () -> 
parser.parseSingle(invalidSql4));
     }
+
+    @Test
+    public void testWarmUpSelect() {
+        ConnectContext ctx = ConnectContext.get();
+        ctx.getSessionVariable().setEnableFileCache(true);
+        ctx.getSessionVariable().setDisableFileCache(false);
+        NereidsParser nereidsParser = new NereidsParser();
+
+        // Test basic warm up select statement
+        String warmUpSql = "WARM UP SELECT * FROM test_table";
+        LogicalPlan logicalPlan = nereidsParser.parseSingle(warmUpSql);
+        Assertions.assertNotNull(logicalPlan);
+        Assertions.assertEquals(StmtType.INSERT, logicalPlan.stmtType());
+
+        // Test warm up select with where clause
+        String warmUpSqlWithWhere = "WARM UP SELECT id, name FROM test_table 
WHERE id > 10";
+        LogicalPlan logicalPlanWithWhere = 
nereidsParser.parseSingle(warmUpSqlWithWhere);
+        Assertions.assertNotNull(logicalPlanWithWhere);
+        Assertions.assertEquals(StmtType.INSERT, 
logicalPlanWithWhere.stmtType());
+
+        // Negative cases: LIMIT, JOIN, UNION, AGGREGATE not allowed
+        Assertions.assertThrows(ParseException.class, () -> 
nereidsParser.parseSingle("WARM UP SELECT * FROM test_table LIMIT 100"));
+        Assertions.assertThrows(ParseException.class, () -> 
nereidsParser.parseSingle("WARM UP SELECT * FROM t1 JOIN t2 ON t1.id = t2.id"));
+        Assertions.assertThrows(ParseException.class, () -> 
nereidsParser.parseSingle("WARM UP SELECT * FROM t1 UNION SELECT * FROM t2"));
+        Assertions.assertThrows(ParseException.class, () -> 
nereidsParser.parseSingle("WARM UP SELECT id, COUNT(*) FROM test_table GROUP BY 
id"));
+        Assertions.assertThrows(ParseException.class, () -> 
nereidsParser.parseSingle("WARM UP SELECT * FROM test_table ORDER BY id"));
+    }
 }
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index fb4ebd90e00..bda5daa03bf 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -39,6 +39,7 @@ message PQueryStatistics {
     optional int64 scan_bytes_from_remote_storage = 8;
     optional int64 spill_write_bytes_to_local_storage = 9;
     optional int64 spill_read_bytes_from_local_storage = 10;
+    optional int64 bytes_write_into_cache = 11;
 }
 
 message PRowBatch {
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index dcb62ea4159..d218db5dba5 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -41,6 +41,7 @@ enum TDataSinkType {
     HIVE_TABLE_SINK = 13,
     ICEBERG_TABLE_SINK = 14,
     DICTIONARY_SINK = 15,
+    BLACKHOLE_SINK = 16,
 }
 
 enum TResultSinkType {
@@ -444,6 +445,9 @@ struct TDictionarySink {
     9: optional i64 memory_limit
 }
 
+struct TBlackholeSink {
+}
+
 struct TDataSink {
   1: required TDataSinkType type
   2: optional TDataStreamSink stream_sink
@@ -459,4 +463,5 @@ struct TDataSink {
   13: optional THiveTableSink hive_table_sink
   14: optional TIcebergTableSink iceberg_table_sink
   15: optional TDictionarySink dictionary_sink
+  16: optional TBlackholeSink blackhole_sink
 }
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index eca2ef59d9e..b9fe52c59f2 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -159,6 +159,7 @@ enum TSchemaTableType {
     SCH_SQL_BLOCK_RULE_STATUS = 59;
     SCH_CLUSTER_SNAPSHOTS = 60;
     SCH_CLUSTER_SNAPSHOT_PROPERTIES = 61;
+    SCH_BLACKHOLE = 62;
 }
 
 enum THdfsCompression {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index f0399df51ec..54a6d1555e5 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -201,11 +201,18 @@ struct TQueryStatistics {
     11: optional i64 scan_bytes_from_remote_storage
     12: optional i64 spill_write_bytes_to_local_storage
     13: optional i64 spill_read_bytes_from_local_storage
+    14: optional i64 bytes_write_into_cache
+}
+
+struct TQueryStatisticsResult {
+    1: optional bool query_finished
+    2: optional TQueryStatistics statistics
 }
 
 struct TReportWorkloadRuntimeStatusParams {
     1: optional i64 backend_id
-    2: optional map<string, TQueryStatistics> query_statistics_map
+    2: optional map<string, TQueryStatistics> query_statistics_map // 
deprecated
+    3: optional map<string, TQueryStatisticsResult> query_statistics_result_map
 }
 
 struct TQueryProfile {
diff --git 
a/regression-test/suites/cloud_p0/warm_up_select/test_warmup_select.groovy 
b/regression-test/suites/cloud_p0/warm_up_select/test_warmup_select.groovy
new file mode 100644
index 00000000000..2b0b39affc2
--- /dev/null
+++ b/regression-test/suites/cloud_p0/warm_up_select/test_warmup_select.groovy
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_warmup_select") {
+    sql """
+    drop table if exists lineitem
+    """
+    sql """
+    CREATE TABLE IF NOT EXISTS lineitem (
+      l_orderkey    INTEGER NOT NULL,
+      l_partkey     INTEGER NOT NULL,
+      l_suppkey     INTEGER NOT NULL,
+      l_linenumber  INTEGER NOT NULL,
+      l_quantity    DECIMALV3(15,2) NOT NULL,
+      l_extendedprice  DECIMALV3(15,2) NOT NULL,
+      l_discount    DECIMALV3(15,2) NOT NULL,
+      l_tax         DECIMALV3(15,2) NOT NULL,
+      l_returnflag  CHAR(1) NOT NULL,
+      l_linestatus  CHAR(1) NOT NULL,
+      l_shipdate    DATE NOT NULL,
+      l_commitdate  DATE NOT NULL,
+      l_receiptdate DATE NOT NULL,
+      l_shipinstruct CHAR(25) NOT NULL,
+      l_shipmode     CHAR(10) NOT NULL,
+      l_comment      VARCHAR(44) NOT NULL
+    )
+    DUPLICATE KEY(l_orderkey, l_partkey, l_suppkey, l_linenumber)
+    PARTITION BY RANGE(l_shipdate) (
+    PARTITION `day_2` VALUES LESS THAN ('2023-12-9'),
+    PARTITION `day_3` VALUES LESS THAN ("2023-12-11"),
+    PARTITION `day_4` VALUES LESS THAN ("2023-12-30")
+    )
+    DISTRIBUTED BY HASH(l_orderkey) BUCKETS 3
+    PROPERTIES (
+      "replication_num" = "1"
+    )
+    """
+
+    sql """
+    insert into lineitem values
+    (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-08', '2023-12-09', 
'2023-12-10', 'a', 'b', 'yyyyyyyyy'),
+    (2, 4, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-09', '2023-12-09', 
'2023-12-10', 'a', 'b', 'yyyyyyyyy'),
+    (3, 2, 4, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-10', '2023-12-09', 
'2023-12-10', 'a', 'b', 'yyyyyyyyy'),
+    (4, 3, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-11', '2023-12-09', 
'2023-12-10', 'a', 'b', 'yyyyyyyyy'),
+    (5, 2, 3, 6, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-12-12', '2023-12-12', 
'2023-12-13', 'c', 'd', 'xxxxxxxxx');
+    """
+
+    def test_basic_warmup = {
+        // Enable file cache for warm up functionality
+        sql "set disable_file_cache=false"
+
+        sql "WARM UP SELECT * FROM lineitem"
+
+        sql "WARM UP SELECT l_orderkey, l_discount FROM lineitem"
+
+        sql "WARM UP SELECT l_orderkey, l_discount FROM lineitem WHERE 
l_quantity > 10"
+    }
+
+    def test_warmup_negative_cases = {
+        // Enable file cache for warm up functionality
+        sql "set disable_file_cache=false"
+
+        // These should fail as warm up select doesn't support these operations
+        try {
+            sql "WARM UP SELECT * FROM lineitem LIMIT 5"
+            assert false : "Expected ParseException for LIMIT clause"
+        } catch (Exception e) {
+            // Expected to fail
+            println "LIMIT clause correctly rejected for WARM UP SELECT"
+        }
+
+        try {
+            sql "WARM UP SELECT l_shipmode, COUNT(*) FROM lineitem GROUP BY 
l_shipmode"
+            assert false : "Expected ParseException for GROUP BY clause"
+        } catch (Exception e) {
+            // Expected to fail
+            println "GROUP BY clause correctly rejected for WARM UP SELECT"
+        }
+
+        try {
+            sql "WARM UP SELECT * FROM lineitem t1 JOIN lineitem t2 ON 
t1.l_orderkey = t2.l_orderkey"
+            assert false : "Expected ParseException for JOIN clause"
+        } catch (Exception e) {
+            // Expected to fail
+            println "JOIN clause correctly rejected for WARM UP SELECT"
+        }
+
+        try {
+            sql "WARM UP SELECT * FROM lineitem UNION SELECT * FROM lineitem"
+            assert false : "Expected ParseException for UNION clause"
+        } catch (Exception e) {
+            // Expected to fail
+            println "UNION clause correctly rejected for WARM UP SELECT"
+        }
+    }
+
+    test_basic_warmup()
+    test_warmup_negative_cases()
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy 
b/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
new file mode 100644
index 00000000000..7d270ffe3b8
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/cache/test_hive_warmup_select.groovy
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hive_warmup_select", 
"p0,external,hive,external_docker,external_docker_hive") {
+    String enabled = context.config.otherConfigs.get("enableHiveTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable Hive test.")
+        return;
+    }
+
+    def test_basic_warmup = {
+        // Enable file cache for warm up functionality
+        sql "set enable_file_cache=true"
+        sql "set disable_file_cache=false"
+        
+        sql "WARM UP SELECT * FROM lineitem"
+
+        sql "WARM UP SELECT l_orderkey, l_discount FROM lineitem"
+        
+        sql "WARM UP SELECT l_orderkey, l_discount FROM lineitem WHERE 
l_quantity > 10"
+    }
+
+    def test_warmup_negative_cases = {
+        // Enable file cache for warm up functionality
+        sql "set enable_file_cache=true"
+        sql "set disable_file_cache=false"
+        
+        // These should fail as warm up select doesn't support these operations
+        try {
+            sql "WARM UP SELECT * FROM lineitem LIMIT 5"
+            assert false : "Expected ParseException for LIMIT clause"
+        } catch (Exception e) {
+            // Expected to fail
+            println "LIMIT clause correctly rejected for WARM UP SELECT"
+        }
+        
+        try {
+            sql "WARM UP SELECT l_shipmode, COUNT(*) FROM lineitem GROUP BY 
l_shipmode"
+            assert false : "Expected ParseException for GROUP BY clause"
+        } catch (Exception e) {
+            // Expected to fail
+            println "GROUP BY clause correctly rejected for WARM UP SELECT"
+        }
+        
+        try {
+            sql "WARM UP SELECT * FROM lineitem t1 JOIN lineitem t2 ON 
t1.l_orderkey = t2.l_orderkey"
+            assert false : "Expected ParseException for JOIN clause"
+        } catch (Exception e) {
+            // Expected to fail
+            println "JOIN clause correctly rejected for WARM UP SELECT"
+        }
+        
+        try {
+            sql "WARM UP SELECT * FROM lineitem UNION SELECT * FROM lineitem"
+            assert false : "Expected ParseException for UNION clause"
+        } catch (Exception e) {
+            // Expected to fail
+            println "UNION clause correctly rejected for WARM UP SELECT"
+        }
+    }
+
+    def test_warmup_permission = { String catalog_name ->
+        // Test that warm up select only requires SELECT privilege, not LOAD 
privilege
+        def user1 = 'test_hive_warmup_user1'
+        def pwd = '123456'
+        def tokens = context.config.jdbcUrl.split('/')
+        def url = tokens[0] + "//" + tokens[2] + 
"/?defaultCatalog=${catalog_name}"
+
+        // Clean up
+        sql """DROP USER IF EXISTS ${user1}"""
+
+        try {
+            // Create user with only SELECT privilege on external catalog
+            sql """CREATE USER '${user1}' IDENTIFIED BY '${pwd}'"""
+            sql """GRANT SELECT_PRIV ON ${catalog_name}.tpch1_parquet.lineitem 
TO ${user1}"""
+
+            // Test: user with only SELECT privilege should be able to run 
WARM UP SELECT
+            connect(user1, "${pwd}", url) {
+                sql "set enable_file_cache=true"
+                sql "set disable_file_cache=false"
+
+                // This should succeed - only SELECT privilege is needed
+                sql "WARM UP SELECT * FROM 
${catalog_name}.tpch1_parquet.lineitem"
+                
+                sql "WARM UP SELECT l_orderkey, l_discount FROM 
${catalog_name}.tpch1_parquet.lineitem WHERE l_quantity > 10"
+                
+                // Verify regular SELECT also works
+                def result = sql "SELECT COUNT(*) FROM 
${catalog_name}.tpch1_parquet.lineitem"
+                assert result.size() > 0
+            }
+
+            // Test: user without SELECT privilege should fail
+            sql """REVOKE SELECT_PRIV ON 
${catalog_name}.tpch1_parquet.lineitem FROM ${user1}"""
+            
+            connect(user1, "${pwd}", url) {
+                sql "set enable_file_cache=true"
+                sql "set disable_file_cache=false"
+                test {
+                    sql "WARM UP SELECT * FROM 
${catalog_name}.tpch1_parquet.lineitem"
+                    exception "denied"
+                }
+            }
+
+            // Test: user with LOAD privilege but no SELECT privilege should 
also fail
+            sql """GRANT LOAD_PRIV ON ${catalog_name}.tpch1_parquet.lineitem 
TO ${user1}"""
+            
+            connect(user1, "${pwd}", url) {
+                sql "set enable_file_cache=true"
+                sql "set disable_file_cache=false"
+                test {
+                    sql "WARM UP SELECT * FROM 
${catalog_name}.tpch1_parquet.lineitem"
+                    exception "denied"
+                }
+            }
+
+        } finally {
+            // Clean up
+            sql """DROP USER IF EXISTS ${user1}"""
+        }
+    }
+
+    for (String hivePrefix : ["hive3"]) {
+        String hms_port = context.config.otherConfigs.get(hivePrefix + 
"HmsPort")
+        String catalog_name = "test_${hivePrefix}_warmup_select"
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+        sql """drop catalog if exists ${catalog_name}"""
+        sql """create catalog if not exists ${catalog_name} properties (
+            "type"="hms",
+            'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
+        );"""
+        sql """switch ${catalog_name}"""
+        sql """use `tpch1_parquet`"""
+
+        test_basic_warmup()
+        test_warmup_negative_cases()
+        test_warmup_permission(catalog_name)
+
+        sql """drop catalog if exists ${catalog_name}"""
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to