This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a974d4808a [feature](iceberg) Support static partition overwrite for 
Iceberg tables (#58396)
8a974d4808a is described below

commit 8a974d4808af5173f6c43e2dc68a1c970f392535
Author: Socrates <[email protected]>
AuthorDate: Fri Dec 5 14:09:47 2025 +0800

    [feature](iceberg) Support static partition overwrite for Iceberg tables 
(#58396)
    
    ### What problem does this PR solve?
    
    ### Proposed changes
    
    This PR implements static partition overwrite functionality for Iceberg
    external tables, allowing users to precisely overwrite specific
    partitions using the `INSERT OVERWRITE ... PARTITION (col='value', ...)`
    syntax.
    
    ### Background
    
    Before this PR, Doris supports:
    - ✅ `INSERT INTO` with dynamic partition for Iceberg tables
    - ✅ `INSERT OVERWRITE` for full table replacement
    - ❌ `INSERT OVERWRITE ... PARTITION (...)` for static partition
    overwrite
    
    ### New Features
    
    1. **Full Static Partition Mode**: Overwrite a specific partition when
    all partition columns are specified
       ```sql
    INSERT OVERWRITE TABLE iceberg_db.tbl PARTITION (dt='2025-01-25',
    region='bj')
       SELECT id, name FROM source_table;
       ```
    
    2. **Hybrid Partition Mode**: Partial static + partial dynamic partition
       ```sql
       -- dt is static, region comes from SELECT dynamically
       INSERT OVERWRITE TABLE iceberg_db.tbl PARTITION (dt='2025-01-25')
       SELECT id, name, region FROM source_table;
       ```
    
    ### Implementation Details
    
    #### FE Changes
    - **Parser** (`DorisParser.g4`, `LogicalPlanBuilder.java`): Extended
    partition spec parsing to support `PARTITION (col='value', ...)` syntax
    - **InsertPartitionSpec**: New unified data structure to represent
    partition modes (auto-detect, dynamic, static)
    - **UnboundIcebergTableSink**: Added `staticPartitionKeyValues` field to
    carry static partition info
    - **BindSink**: Added validation for static partition columns and
    generate constant expressions for static partition values
    - **IcebergTransaction**: Implemented `commitStaticPartitionOverwrite()`
    using Iceberg's `OverwriteFiles.overwriteByRowFilter()` API
    - **IcebergUtils**: Added `parsePartitionValueFromString()` utility for
    partition value type conversion
    
    #### BE Changes
    - **VIcebergTableWriter**:
    - Support full static partition mode (all data goes to single partition)
    - Support hybrid partition mode (static columns from config, dynamic
    columns from data)
    - Added `_is_full_static_partition` and
    `_dynamic_partition_column_indices` for mode detection
    
    #### Thrift Changes
    - Added `static_partition_values` field to `TIcebergTableSink` for
    passing static partition info from FE to BE
---
 .../sink/writer/iceberg/viceberg_table_writer.cpp  |  227 +++-
 .../sink/writer/iceberg/viceberg_table_writer.h    |   21 +
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |    6 +-
 .../datasource/iceberg/IcebergTransaction.java     |  106 +-
 .../nereids/analyzer/UnboundIcebergTableSink.java  |   42 +-
 .../nereids/analyzer/UnboundTableSinkCreator.java  |   24 +-
 .../doris/nereids/parser/InsertPartitionSpec.java  |  109 ++
 .../doris/nereids/parser/LogicalPlanBuilder.java   |   76 +-
 .../doris/nereids/rules/analysis/BindSink.java     |  106 +-
 .../insert/IcebergInsertCommandContext.java        |   23 +
 .../insert/InsertOverwriteTableCommand.java        |   35 +-
 .../org/apache/doris/planner/IcebergTableSink.java |    8 +
 .../nereids/parser/InsertPartitionSpecTest.java    |  217 ++++
 .../parser/ParseInsertPartitionSpecTest.java       |  271 +++++
 gensrc/thrift/DataSinks.thrift                     |    4 +
 .../test_iceberg_static_partition_overwrite.out    |  135 +++
 .../test_iceberg_static_partition_overwrite.groovy | 1086 ++++++++++++++++++++
 17 files changed, 2432 insertions(+), 64 deletions(-)

diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 451313053ea..24b29df5cdb 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -18,9 +18,11 @@
 #include "viceberg_table_writer.h"
 
 #include "runtime/runtime_state.h"
+#include "vec/columns/column_const.h"
 #include "vec/core/block.h"
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/core/materialize_block.h"
+#include "vec/data_types/serde/data_type_serde.h"
 #include "vec/exec/format/table/iceberg/partition_spec_parser.h"
 #include "vec/exec/format/table/iceberg/schema_parser.h"
 #include "vec/exprs/vexpr.h"
@@ -78,6 +80,9 @@ Status VIcebergTableWriter::open(RuntimeState* state, 
RuntimeProfile* profile) {
             _partition_spec = iceberg::PartitionSpecParser::from_json(_schema, 
partition_spec_json);
             _iceberg_partition_columns = _to_iceberg_partition_columns();
         }
+
+        // Initialize static partition values if present
+        _init_static_partition_values();
     } catch (doris::Exception& e) {
         return e.to_status();
     }
@@ -116,6 +121,68 @@ VIcebergTableWriter::_to_iceberg_partition_columns() {
     return partition_columns;
 }
 
+void VIcebergTableWriter::_init_static_partition_values() {
+    auto& iceberg_sink = _t_sink.iceberg_table_sink;
+    if (!iceberg_sink.__isset.static_partition_values ||
+        iceberg_sink.static_partition_values.empty()) {
+        _has_static_partition = false;
+        _is_full_static_partition = false;
+        return;
+    }
+
+    _has_static_partition = true;
+    const auto& static_values_map = iceberg_sink.static_partition_values;
+
+    size_t num_cols = _iceberg_partition_columns.size();
+    _partition_column_static_values.resize(num_cols);
+    _partition_column_is_static.assign(num_cols, 0);
+
+    size_t dynamic_count = 0;
+    for (size_t i = 0; i < num_cols; ++i) {
+        const std::string& col_name = 
_iceberg_partition_columns[i].field().name();
+        auto it = static_values_map.find(col_name);
+        if (it != static_values_map.end()) {
+            _partition_column_static_values[i] = it->second;
+            _partition_column_is_static[i] = 1;
+        } else {
+            dynamic_count++;
+        }
+    }
+
+    // Check if all partition columns are statically specified
+    _is_full_static_partition = (dynamic_count == 0);
+
+    // Build static partition path prefix
+    _static_partition_path = _build_static_partition_path();
+
+    // For full static mode, build complete partition value list
+    if (_is_full_static_partition) {
+        _static_partition_value_list = _partition_column_static_values;
+    }
+}
+
+/**
+ * Builds the partition path string for static partition columns.
+ * Only static partition columns are included, dynamic ones are skipped.
+ * Format: "column1=value1/column2=value2/..."
+ * Example: "year=2023/month=12"
+ */
+std::string VIcebergTableWriter::_build_static_partition_path() {
+    std::stringstream ss;
+    bool first = true;
+    for (size_t i = 0; i < _iceberg_partition_columns.size(); ++i) {
+        if (_partition_column_is_static[i]) {
+            if (!first) {
+                ss << "/";
+            }
+            first = false;
+            ss << _escape(_iceberg_partition_columns[i].field().name()) << "="
+               << _escape(_partition_column_static_values[i]);
+        }
+    }
+    return ss.str();
+}
+
 Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& 
block) {
     SCOPED_RAW_TIMER(&_send_data_ns);
     if (block.rows() == 0) {
@@ -129,6 +196,49 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
     std::unordered_map<std::shared_ptr<VIcebergPartitionWriter>, 
IColumn::Filter> writer_positions;
     _row_count += output_block.rows();
 
+    // Case 1: Full static partition - all data goes to a single partition
+    if (_is_full_static_partition) {
+        std::shared_ptr<VIcebergPartitionWriter> writer;
+        {
+            SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
+            auto writer_iter = 
_partitions_to_writers.find(_static_partition_path);
+            if (writer_iter == _partitions_to_writers.end()) {
+                try {
+                    writer = _create_partition_writer(nullptr, -1);
+                } catch (doris::Exception& e) {
+                    return e.to_status();
+                }
+                _partitions_to_writers.insert({_static_partition_path, 
writer});
+                RETURN_IF_ERROR(writer->open(_state, _operator_profile));
+            } else {
+                if (writer_iter->second->written_len() > 
_target_file_size_bytes) {
+                    std::string file_name(writer_iter->second->file_name());
+                    int file_name_index = 
writer_iter->second->file_name_index();
+                    {
+                        SCOPED_RAW_TIMER(&_close_ns);
+                        
static_cast<void>(writer_iter->second->close(Status::OK()));
+                    }
+                    _partitions_to_writers.erase(writer_iter);
+                    try {
+                        writer = _create_partition_writer(nullptr, -1, 
&file_name,
+                                                          file_name_index + 1);
+                    } catch (doris::Exception& e) {
+                        return e.to_status();
+                    }
+                    _partitions_to_writers.insert({_static_partition_path, 
writer});
+                    RETURN_IF_ERROR(writer->open(_state, _operator_profile));
+                } else {
+                    writer = writer_iter->second;
+                }
+            }
+        }
+        SCOPED_RAW_TIMER(&_partition_writers_write_ns);
+        output_block.erase(_non_write_columns_indices);
+        RETURN_IF_ERROR(writer->write(output_block));
+        return Status::OK();
+    }
+
+    // Case 2: Non-partitioned table
     if (_iceberg_partition_columns.empty()) {
         std::shared_ptr<VIcebergPartitionWriter> writer;
         {
@@ -174,9 +284,24 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
         Block transformed_block;
         SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
         transformed_block.reserve(_iceberg_partition_columns.size());
-        for (auto& iceberg_partition_columns : _iceberg_partition_columns) {
-            
transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply(
-                    output_block, iceberg_partition_columns.source_idx()));
+        for (int i = 0; i < _iceberg_partition_columns.size(); ++i) {
+            auto& iceberg_partition_columns = _iceberg_partition_columns[i];
+            if (_has_static_partition && _partition_column_is_static[i]) {
+                auto result_type =
+                        
iceberg_partition_columns.partition_column_transform().get_result_type();
+                auto data_col = result_type->create_column();
+                StringRef str_ref(_partition_column_static_values[i].data(),
+                                  _partition_column_static_values[i].size());
+                DataTypeSerDe::FormatOptions options;
+                RETURN_IF_ERROR(result_type->get_serde()->from_string(str_ref, 
*data_col, options));
+                auto col = ColumnConst::create(std::move(data_col), 
output_block.rows());
+                transformed_block.insert(
+                        {std::move(col), result_type, 
iceberg_partition_columns.field().name()});
+            } else {
+                transformed_block.insert(
+                        
iceberg_partition_columns.partition_column_transform().apply(
+                                output_block, 
iceberg_partition_columns.source_idx()));
+            }
         }
         for (int i = 0; i < output_block.rows(); ++i) {
             std::optional<PartitionData> partition_data;
@@ -263,8 +388,7 @@ Status 
VIcebergTableWriter::_filter_block(doris::vectorized::Block& block,
     const ColumnsWithTypeAndName& columns_with_type_and_name =
             block.get_columns_with_type_and_name();
     vectorized::ColumnsWithTypeAndName result_columns;
-    for (int i = 0; i < columns_with_type_and_name.size(); ++i) {
-        const auto& col = columns_with_type_and_name[i];
+    for (const auto& col : columns_with_type_and_name) {
         
result_columns.emplace_back(col.column->clone_resized(col.column->size()), 
col.type,
                                     col.name);
     }
@@ -317,14 +441,24 @@ std::string VIcebergTableWriter::_partition_to_path(const 
doris::iceberg::Struct
     std::stringstream ss;
     for (size_t i = 0; i < _iceberg_partition_columns.size(); i++) {
         auto& iceberg_partition_column = _iceberg_partition_columns[i];
-        std::string value_string =
-                
iceberg_partition_column.partition_column_transform().to_human_string(
-                        
iceberg_partition_column.partition_column_transform().get_result_type(),
-                        data.get(i));
+        const std::string& col_name = iceberg_partition_column.field().name();
+
+        std::string value_string;
+        // In hybrid mode, check if this column is statically specified
+        if (_has_static_partition && _partition_column_is_static[i]) {
+            // Use static partition value
+            value_string = _partition_column_static_values[i];
+        } else {
+            // Compute from data (dynamic partition)
+            value_string = 
iceberg_partition_column.partition_column_transform().to_human_string(
+                    
iceberg_partition_column.partition_column_transform().get_result_type(),
+                    data.get(i));
+        }
+
         if (i > 0) {
             ss << "/";
         }
-        ss << _escape(iceberg_partition_column.field().name()) << '=' << 
_escape(value_string);
+        ss << _escape(col_name) << '=' << _escape(value_string);
     }
 
     return ss.str();
@@ -340,10 +474,18 @@ std::vector<std::string> 
VIcebergTableWriter::_partition_values(
     partition_values.reserve(_iceberg_partition_columns.size());
     for (size_t i = 0; i < _iceberg_partition_columns.size(); i++) {
         auto& iceberg_partition_column = _iceberg_partition_columns[i];
-        partition_values.emplace_back(
-                
iceberg_partition_column.partition_column_transform().get_partition_value(
-                        
iceberg_partition_column.partition_column_transform().get_result_type(),
-                        data.get(i)));
+
+        // In hybrid mode, check if this column is statically specified
+        if (_has_static_partition && _partition_column_is_static[i]) {
+            // Use static partition value
+            partition_values.emplace_back(_partition_column_static_values[i]);
+        } else {
+            // Compute from data (dynamic partition)
+            partition_values.emplace_back(
+                    
iceberg_partition_column.partition_column_transform().get_partition_value(
+                            
iceberg_partition_column.partition_column_transform().get_result_type(),
+                            data.get(i)));
+        }
     }
 
     return partition_values;
@@ -359,7 +501,22 @@ std::shared_ptr<VIcebergPartitionWriter> 
VIcebergTableWriter::_create_partition_
     std::string original_write_path;
     std::string target_path;
 
-    if (transformed_block != nullptr) {
+    // Case 1: Full static partition - use pre-computed static partition path 
and values
+    if (_is_full_static_partition) {
+        partition_values = _static_partition_value_list;
+        if (!_static_partition_path.empty()) {
+            original_write_path = fmt::format("{}/{}", 
iceberg_table_sink.original_output_path,
+                                              _static_partition_path);
+            target_path = fmt::format("{}/{}", output_path, 
_static_partition_path);
+            write_path = fmt::format("{}/{}", output_path, 
_static_partition_path);
+        } else {
+            original_write_path = iceberg_table_sink.original_output_path;
+            target_path = output_path;
+            write_path = output_path;
+        }
+    } else if (transformed_block != nullptr) {
+        // Case 2: Dynamic partition or Hybrid mode (partial static + partial 
dynamic)
+        // _partition_to_path and _partition_values already handle hybrid mode 
internally
         PartitionData partition_data = _get_partition_data(transformed_block, 
position);
         std::string partition_path = _partition_to_path(partition_data);
         partition_values = _partition_values(partition_data);
@@ -368,16 +525,18 @@ std::shared_ptr<VIcebergPartitionWriter> 
VIcebergTableWriter::_create_partition_
         target_path = fmt::format("{}/{}", output_path, partition_path);
         write_path = fmt::format("{}/{}", output_path, partition_path);
     } else {
+        // Case 3: Non-partitioned table
         original_write_path = iceberg_table_sink.original_output_path;
         target_path = output_path;
         write_path = output_path;
     }
 
-    VIcebergPartitionWriter::WriteInfo write_info = {std::move(write_path),
-                                                     
std::move(original_write_path),
-                                                     std::move(target_path),
-                                                     
iceberg_table_sink.file_type,
-                                                     {}};
+    VIcebergPartitionWriter::WriteInfo write_info = {
+            .write_path = std::move(write_path),
+            .original_write_path = std::move(original_write_path),
+            .target_path = std::move(target_path),
+            .file_type = iceberg_table_sink.file_type,
+            .broker_addresses = {}};
     if (iceberg_table_sink.__isset.broker_addresses) {
         
write_info.broker_addresses.assign(iceberg_table_sink.broker_addresses.begin(),
                                            
iceberg_table_sink.broker_addresses.end());
@@ -387,7 +546,7 @@ std::shared_ptr<VIcebergPartitionWriter> 
VIcebergTableWriter::_create_partition_
     std::vector<std::string> column_names;
     column_names.reserve(_write_output_vexpr_ctxs.size());
     for (int i = 0; i < _schema->columns().size(); i++) {
-        if (_non_write_columns_indices.find(i) == 
_non_write_columns_indices.end()) {
+        if (!_non_write_columns_indices.contains(i)) {
             column_names.emplace_back(_schema->columns()[i].field_name());
         }
     }
@@ -406,14 +565,18 @@ PartitionData 
VIcebergTableWriter::_get_partition_data(vectorized::Block* transf
     values.reserve(_iceberg_partition_columns.size());
     int column_idx = 0;
     for (auto& iceberg_partition_column : _iceberg_partition_columns) {
-        const vectorized::ColumnWithTypeAndName& partition_column =
-                transformed_block->get_by_position(column_idx);
-        auto value =
-                
_get_iceberg_partition_value(iceberg_partition_column.partition_column_transform()
-                                                     .get_result_type()
-                                                     ->get_primitive_type(),
-                                             partition_column, position);
-        values.emplace_back(value);
+        if (_has_static_partition && _partition_column_is_static[column_idx]) {
+            values.emplace_back();
+        } else {
+            const vectorized::ColumnWithTypeAndName& partition_column =
+                    transformed_block->get_by_position(column_idx);
+            auto value = _get_iceberg_partition_value(
+                    iceberg_partition_column.partition_column_transform()
+                            .get_result_type()
+                            ->get_primitive_type(),
+                    partition_column, position);
+            values.emplace_back(value);
+        }
         ++column_idx;
     }
     return PartitionData(std::move(values));
@@ -426,11 +589,11 @@ std::any 
VIcebergTableWriter::_get_iceberg_partition_value(
     ColumnPtr col_ptr = 
partition_column.column->convert_to_full_column_if_const();
     CHECK(col_ptr);
     if (col_ptr->is_nullable()) {
-        const ColumnNullable* nullable_column =
+        const auto* nullable_column =
                 reinterpret_cast<const 
vectorized::ColumnNullable*>(col_ptr.get());
-        auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
+        const auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
         if (null_map_data[position]) {
-            return std::any();
+            return {};
         }
         col_ptr = nullable_column->get_nested_column_ptr();
     }
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index 4ea3e2135bb..b6e3c4edc57 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -97,6 +97,11 @@ private:
     std::string _escape(const std::string& path);
     std::vector<std::string> _partition_values(const 
doris::iceberg::StructLike& data);
 
+    // Initialize static partition values from Thrift config
+    void _init_static_partition_values();
+    // Build static partition path from static partition values
+    std::string _build_static_partition_path();
+
     std::shared_ptr<VIcebergPartitionWriter> _create_partition_writer(
             vectorized::Block* transformed_block, int position,
             const std::string* file_name = nullptr, int file_name_index = 0);
@@ -125,6 +130,22 @@ private:
     std::set<size_t> _non_write_columns_indices;
     std::vector<IcebergPartitionColumn> _iceberg_partition_columns;
 
+    // Static partition values for each partition column (indexed by column 
index)
+    // If _partition_column_is_static[i] is true, this stores the static value.
+    std::vector<std::string> _partition_column_static_values;
+    // Flags to indicate if the partition column at index i is static
+    std::vector<uint8_t> _partition_column_is_static;
+
+    // Whether any static partition columns are specified
+    bool _has_static_partition = false;
+    // Whether ALL partition columns are statically specified (full static 
mode)
+    // If false but _has_static_partition is true, it's partial static 
(hybrid) mode
+    bool _is_full_static_partition = false;
+    // Pre-computed static partition path prefix (for full static mode, this 
is the complete path)
+    std::string _static_partition_path;
+    // Pre-computed static partition value list (for full static mode only)
+    std::vector<std::string> _static_partition_value_list;
+
     std::unordered_map<std::string, std::shared_ptr<VIcebergPartitionWriter>>
             _partitions_to_writers;
 
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index d0caecc2bd6..003f9c36b34 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -988,10 +988,14 @@ partitionSpec
     : TEMPORARY? (PARTITION | PARTITIONS) partitions=identifierList
     | TEMPORARY? PARTITION partition=errorCapturingIdentifier
        | (PARTITION | PARTITIONS) LEFT_PAREN ASTERISK RIGHT_PAREN // for auto 
detect partition in overwriting
-       // TODO: support analyze external table partition spec 
https://github.com/apache/doris/pull/24154
+       | PARTITION LEFT_PAREN partitionKeyValue (COMMA partitionKeyValue)* 
RIGHT_PAREN // static partition: PARTITION (col1='val1', col2='val2')
        // | PARTITIONS WITH RECENT
     ;
 
+partitionKeyValue
+    : identifier EQ expression  // col='value' or col=123
+    ;
+
 partitionTable
     : ((autoPartition=AUTO)? PARTITION BY (RANGE | LIST)? 
partitionList=identityOrFunctionList
        (LEFT_PAREN (partitions=partitionsDef)? RIGHT_PAREN))
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index b1bbcc920cf..502b1eaa7c8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -37,20 +37,27 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.types.Types;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 public class IcebergTransaction implements Transaction {
@@ -258,7 +265,12 @@ public class IcebergTransaction implements Transaction {
         if (updateMode == TUpdateMode.APPEND) {
             commitAppendTxn(pendingResults);
         } else {
-            commitReplaceTxn(pendingResults);
+            // Check if this is a static partition overwrite
+            if (insertCtx != null && insertCtx.isStaticPartitionOverwrite()) {
+                commitStaticPartitionOverwrite(pendingResults);
+            } else {
+                commitReplaceTxn(pendingResults);
+            }
         }
     }
 
@@ -373,4 +385,96 @@ public class IcebergTransaction implements Transaction {
         appendPartitionOp.commit();
     }
 
+    /**
+     * Commit static partition overwrite operation
+     * This method uses OverwriteFiles.overwriteByRowFilter() to overwrite 
only the specified partitions
+     */
+    private void commitStaticPartitionOverwrite(List<WriteResult> 
pendingResults) {
+        Table icebergTable = transaction.table();
+        PartitionSpec spec = icebergTable.spec();
+        Schema schema = icebergTable.schema();
+
+        // Build partition filter expression from static partition values
+        Expression partitionFilter = buildPartitionFilter(
+                insertCtx.getStaticPartitionValues(), spec, schema);
+
+        // Create OverwriteFiles operation
+        OverwriteFiles overwriteFiles = transaction.newOverwrite();
+        if (branchName != null) {
+            overwriteFiles = overwriteFiles.toBranch(branchName);
+        }
+        overwriteFiles = 
overwriteFiles.scanManifestsWith(ops.getThreadPoolWithPreAuth());
+
+        // Set partition filter to overwrite only matching partitions
+        overwriteFiles = overwriteFiles.overwriteByRowFilter(partitionFilter);
+
+        // Add new data files
+        for (WriteResult result : pendingResults) {
+            Preconditions.checkState(result.referencedDataFiles().length == 0,
+                    "Should have no referenced data files for static partition 
overwrite.");
+            Arrays.stream(result.dataFiles()).forEach(overwriteFiles::addFile);
+        }
+
+        // Commit the overwrite operation
+        overwriteFiles.commit();
+    }
+
+    /**
+     * Build partition filter expression from static partition key-value pairs
+     *
+     * @param staticPartitions Map of partition column name to partition value 
(as String)
+     * @param spec PartitionSpec of the table
+     * @param schema Schema of the table
+     * @return Iceberg Expression for partition filtering
+     */
+    private Expression buildPartitionFilter(
+            Map<String, String> staticPartitions,
+            PartitionSpec spec,
+            Schema schema) {
+        if (staticPartitions == null || staticPartitions.isEmpty()) {
+            return Expressions.alwaysTrue();
+        }
+
+        List<Expression> predicates = new ArrayList<>();
+
+        for (PartitionField field : spec.fields()) {
+            String partitionColName = field.name();
+            if (staticPartitions.containsKey(partitionColName)) {
+                String partitionValueStr = 
staticPartitions.get(partitionColName);
+
+                // Get source field to determine the type
+                Types.NestedField sourceField = 
schema.findField(field.sourceId());
+                if (sourceField == null) {
+                    throw new RuntimeException(String.format("Source field not 
found for partition field: %s",
+                        partitionColName));
+                }
+
+                // Convert partition value string to appropriate type
+                Object partitionValue = 
IcebergUtils.parsePartitionValueFromString(
+                        partitionValueStr, sourceField.type());
+
+                // Build equality expression using source field name (not 
partition field name)
+                // For identity partitions, Iceberg requires the source column 
name in expressions
+                String sourceColName = sourceField.name();
+                Expression eqExpr;
+                if (partitionValue == null) {
+                    eqExpr = Expressions.isNull(sourceColName);
+                } else {
+                    eqExpr = Expressions.equal(sourceColName, partitionValue);
+                }
+                predicates.add(eqExpr);
+            }
+        }
+
+        if (predicates.isEmpty()) {
+            return Expressions.alwaysTrue();
+        }
+
+        // Combine all predicates with AND
+        Expression result = predicates.get(0);
+        for (int i = 1; i < predicates.size(); i++) {
+            result = Expressions.and(result, predicates.get(i));
+        }
+        return result;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundIcebergTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundIcebergTableSink.java
index 2632bda2310..765b56de1b9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundIcebergTableSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundIcebergTableSink.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.analyzer;
 
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
@@ -26,19 +27,24 @@ import 
org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
  * Represent an iceberg table sink plan node that has not been bound.
  */
 public class UnboundIcebergTableSink<CHILD_TYPE extends Plan> extends 
UnboundBaseExternalTableSink<CHILD_TYPE> {
+    // Static partition key-value pairs for INSERT OVERWRITE ... PARTITION
+    // (col='val', ...)
+    private final Map<String, Expression> staticPartitionKeyValues;
 
     public UnboundIcebergTableSink(List<String> nameParts, List<String> 
colNames, List<String> hints,
                                    List<String> partitions, CHILD_TYPE child) {
         this(nameParts, colNames, hints, partitions, DMLCommandType.NONE,
-                Optional.empty(), Optional.empty(), child);
+                Optional.empty(), Optional.empty(), child, null);
     }
 
     /**
@@ -52,8 +58,35 @@ public class UnboundIcebergTableSink<CHILD_TYPE extends 
Plan> extends UnboundBas
                                    Optional<GroupExpression> groupExpression,
                                    Optional<LogicalProperties> 
logicalProperties,
                                    CHILD_TYPE child) {
+        this(nameParts, colNames, hints, partitions, dmlCommandType,
+                groupExpression, logicalProperties, child, null);
+    }
+
+    /**
+     * constructor with static partition
+     */
+    public UnboundIcebergTableSink(List<String> nameParts,
+            List<String> colNames,
+            List<String> hints,
+            List<String> partitions,
+            DMLCommandType dmlCommandType,
+            Optional<GroupExpression> groupExpression,
+            Optional<LogicalProperties> logicalProperties,
+            CHILD_TYPE child,
+            Map<String, Expression> staticPartitionKeyValues) {
         super(nameParts, PlanType.LOGICAL_UNBOUND_ICEBERG_TABLE_SINK, 
ImmutableList.of(), groupExpression,
                 logicalProperties, colNames, dmlCommandType, child, hints, 
partitions);
+        this.staticPartitionKeyValues = staticPartitionKeyValues != null
+                ? ImmutableMap.copyOf(staticPartitionKeyValues)
+                : null;
+    }
+
+    public Map<String, Expression> getStaticPartitionKeyValues() {
+        return staticPartitionKeyValues;
+    }
+
+    public boolean hasStaticPartition() {
+        return staticPartitionKeyValues != null && 
!staticPartitionKeyValues.isEmpty();
     }
 
     @Override
@@ -61,7 +94,7 @@ public class UnboundIcebergTableSink<CHILD_TYPE extends Plan> 
extends UnboundBas
         Preconditions.checkArgument(children.size() == 1,
                 "UnboundIcebergTableSink only accepts one child");
         return new UnboundIcebergTableSink<>(nameParts, colNames, hints, 
partitions,
-                dmlCommandType, groupExpression, Optional.empty(), 
children.get(0));
+                dmlCommandType, groupExpression, Optional.empty(), 
children.get(0), staticPartitionKeyValues);
     }
 
     @Override
@@ -72,13 +105,14 @@ public class UnboundIcebergTableSink<CHILD_TYPE extends 
Plan> extends UnboundBas
     @Override
     public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
         return new UnboundIcebergTableSink<>(nameParts, colNames, hints, 
partitions,
-                dmlCommandType, groupExpression, 
Optional.of(getLogicalProperties()), child());
+                dmlCommandType, groupExpression, 
Optional.of(getLogicalProperties()), child(),
+                staticPartitionKeyValues);
     }
 
     @Override
     public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
             Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
         return new UnboundIcebergTableSink<>(nameParts, colNames, hints, 
partitions,
-                dmlCommandType, groupExpression, logicalProperties, 
children.get(0));
+                dmlCommandType, groupExpression, logicalProperties, 
children.get(0), staticPartitionKeyValues);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
index a5dfa917fe3..e579c12d82a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
@@ -27,6 +27,7 @@ import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
 import org.apache.doris.dictionary.Dictionary;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.exceptions.ParseException;
+import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
@@ -38,6 +39,7 @@ import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
 import com.google.common.collect.ImmutableList;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -70,6 +72,18 @@ public class UnboundTableSinkCreator {
                 List<String> colNames, List<String> hints, boolean 
temporaryPartition, List<String> partitions,
                 boolean isPartialUpdate, TPartialUpdateNewRowPolicy 
partialUpdateNewKeyPolicy,
                 DMLCommandType dmlCommandType, LogicalPlan plan) {
+        return createUnboundTableSink(nameParts, colNames, hints, 
temporaryPartition, partitions,
+                isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType, 
plan, null);
+    }
+
+    /**
+     * create unbound sink for DML plan with static partition support for 
Iceberg.
+     */
+    public static LogicalSink<? extends Plan> 
createUnboundTableSink(List<String> nameParts,
+            List<String> colNames, List<String> hints, boolean 
temporaryPartition, List<String> partitions,
+            boolean isPartialUpdate, TPartialUpdateNewRowPolicy 
partialUpdateNewKeyPolicy,
+            DMLCommandType dmlCommandType, LogicalPlan plan,
+            Map<String, Expression> staticPartitionKeyValues) {
         String catalogName = 
RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0);
         CatalogIf<?> curCatalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
         if (curCatalog instanceof InternalCatalog) {
@@ -81,7 +95,7 @@ public class UnboundTableSinkCreator {
                     dmlCommandType, Optional.empty(), Optional.empty(), plan);
         } else if (curCatalog instanceof IcebergExternalCatalog) {
             return new UnboundIcebergTableSink<>(nameParts, colNames, hints, 
partitions,
-                    dmlCommandType, Optional.empty(), Optional.empty(), plan);
+                    dmlCommandType, Optional.empty(), Optional.empty(), plan, 
staticPartitionKeyValues);
         } else if (curCatalog instanceof JdbcExternalCatalog) {
             return new UnboundJdbcTableSink<>(nameParts, colNames, hints, 
partitions,
                     dmlCommandType, Optional.empty(), Optional.empty(), plan);
@@ -90,13 +104,15 @@ public class UnboundTableSinkCreator {
     }
 
     /**
-     * create unbound sink for DML plan with auto detect overwrite partition 
enable.
+     * create unbound sink for DML plan with auto detect overwrite partition 
enable
+     * and static partition support for Iceberg.
+     * TODO: staticPartitionKeyValues is only used for Iceberg, support other 
catalog types in future.
      */
     public static LogicalSink<? extends Plan> 
createUnboundTableSinkMaybeOverwrite(List<String> nameParts,
             List<String> colNames, List<String> hints, boolean 
temporaryPartition, List<String> partitions,
             boolean isAutoDetectPartition, boolean isOverwrite, boolean 
isPartialUpdate,
             TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, 
DMLCommandType dmlCommandType,
-            LogicalPlan plan) {
+            LogicalPlan plan, Map<String, Expression> 
staticPartitionKeyValues) {
         if (isAutoDetectPartition) { // partitions is null
             if (!isOverwrite) {
                 throw new ParseException("ASTERISK is only supported in 
overwrite partition for OLAP table");
@@ -117,7 +133,7 @@ public class UnboundTableSinkCreator {
                     dmlCommandType, Optional.empty(), Optional.empty(), plan);
         } else if (curCatalog instanceof IcebergExternalCatalog && 
!isAutoDetectPartition) {
             return new UnboundIcebergTableSink<>(nameParts, colNames, hints, 
partitions,
-                    dmlCommandType, Optional.empty(), Optional.empty(), plan);
+                    dmlCommandType, Optional.empty(), Optional.empty(), plan, 
staticPartitionKeyValues);
         } else if (curCatalog instanceof JdbcExternalCatalog) {
             return new UnboundJdbcTableSink<>(nameParts, colNames, hints, 
partitions,
                     dmlCommandType, Optional.empty(), Optional.empty(), plan);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/InsertPartitionSpec.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/InsertPartitionSpec.java
new file mode 100644
index 00000000000..6b9e7d7b24c
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/InsertPartitionSpec.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.parser;
+
+import org.apache.doris.nereids.trees.expressions.Expression;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Unified partition specification for INSERT statements.
+ * Supports multiple partition modes:
+ * 1. Auto-detect: PARTITION (*) - partitions = null
+ * 2. Dynamic partition by name: PARTITION (p1, p2) - partitionNames is
+ * non-empty
+ * 3. Static partition with values: PARTITION (col='val', ...) -
+ * staticPartitionValues is non-empty
+ * 4. No partition specified: all fields are empty/default
+ */
+public class InsertPartitionSpec {
+
+    /** Static partition key-value pairs: col_name -> value expression */
+    private final Map<String, Expression> staticPartitionValues;
+
+    /** Dynamic partition names (e.g., PARTITION (p1, p2)) */
+    private final List<String> partitionNames;
+
+    /** Whether it's a temporary partition */
+    private final boolean isTemporary;
+
+    /** Whether it's auto-detect mode (PARTITION (*)) */
+    private final boolean isAutoDetect;
+
+    private InsertPartitionSpec(Map<String, Expression> staticPartitionValues,
+            List<String> partitionNames, boolean isTemporary, boolean 
isAutoDetect) {
+        this.staticPartitionValues = staticPartitionValues != null
+                ? ImmutableMap.copyOf(staticPartitionValues)
+                : ImmutableMap.of();
+        this.partitionNames = partitionNames != null
+                ? ImmutableList.copyOf(partitionNames)
+                : ImmutableList.of();
+        this.isTemporary = isTemporary;
+        this.isAutoDetect = isAutoDetect;
+    }
+
+    /** No partition specified */
+    public static InsertPartitionSpec none() {
+        return new InsertPartitionSpec(null, null, false, false);
+    }
+
+    /** Auto-detect partition: PARTITION (*) */
+    public static InsertPartitionSpec autoDetect(boolean isTemporary) {
+        return new InsertPartitionSpec(null, null, isTemporary, true);
+    }
+
+    /** Dynamic partition by name: PARTITION (p1, p2) */
+    public static InsertPartitionSpec dynamicPartition(List<String> 
partitionNames, boolean isTemporary) {
+        return new InsertPartitionSpec(null, partitionNames, isTemporary, 
false);
+    }
+
+    /** Static partition with values: PARTITION (col='val', ...) */
+    public static InsertPartitionSpec staticPartition(Map<String, Expression> 
staticValues, boolean isTemporary) {
+        return new InsertPartitionSpec(staticValues, null, isTemporary, false);
+    }
+
+    /** Check if this is a static partition with key-value pairs */
+    public boolean isStaticPartition() {
+        return staticPartitionValues != null && 
!staticPartitionValues.isEmpty();
+    }
+
+    /** Check if this has dynamic partition names */
+    public boolean hasDynamicPartitionNames() {
+        return partitionNames != null && !partitionNames.isEmpty();
+    }
+
+    public Map<String, Expression> getStaticPartitionValues() {
+        return staticPartitionValues;
+    }
+
+    public List<String> getPartitionNames() {
+        return partitionNames;
+    }
+
+    public boolean isTemporary() {
+        return isTemporary;
+    }
+
+    public boolean isAutoDetect() {
+        return isAutoDetect;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index da410826cd5..427dd78a231 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -1347,27 +1347,26 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
             branchName = Optional.of(ctx.optSpecBranch().name.getText());
         }
         List<String> colNames = ctx.cols == null ? ImmutableList.of() : 
visitIdentifierList(ctx.cols);
-        // TODO visit partitionSpecCtx
         LogicalPlan plan = visitQuery(ctx.query());
-        // partitionSpec may be NULL. means auto detect partition. only 
available when IOT
-        Pair<Boolean, List<String>> partitionSpec = 
visitPartitionSpec(ctx.partitionSpec());
-        // partitionSpec.second :
-        // null - auto detect
-        // zero - whole table
-        // others - specific partitions
-        boolean isAutoDetect = partitionSpec.second == null;
+
+        // Parse partition specification using unified method
+        InsertPartitionSpec partitionSpec = 
parseInsertPartitionSpec(ctx.partitionSpec());
+
+        // Unified sink creation for all catalog types (including Iceberg 
static
+        // partition)
         LogicalSink<?> sink = 
UnboundTableSinkCreator.createUnboundTableSinkMaybeOverwrite(
                 tableName.build(),
                 colNames,
-                ImmutableList.of(), // hints
-                partitionSpec.first, // isTemp
-                partitionSpec.second, // partition names
-                isAutoDetect,
+                ImmutableList.of(),
+                partitionSpec.isTemporary(),
+                partitionSpec.isAutoDetect() ? null : 
partitionSpec.getPartitionNames(),
+                partitionSpec.isAutoDetect(),
                 isOverwrite,
                 
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(),
                 
ConnectContext.get().getSessionVariable().getPartialUpdateNewRowPolicy(),
                 ctx.tableId == null ? DMLCommandType.INSERT : 
DMLCommandType.GROUP_COMMIT,
-                plan);
+                plan,
+                partitionSpec.isStaticPartition() ? 
partitionSpec.getStaticPartitionValues() : null);
         Optional<LogicalPlan> cte = Optional.empty();
         if (ctx.cte() != null) {
             cte = Optional.ofNullable(withCte(plan, ctx.cte()));
@@ -1446,6 +1445,11 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
                 partitions = null;
             } else if (ctx.partition != null) {
                 partitions = ImmutableList.of(ctx.partition.getText());
+            } else if (ctx.partitionKeyValue() != null && 
!ctx.partitionKeyValue().isEmpty()) {
+                // Static partition: PARTITION (col='val', ...)
+                throw new ParseException(
+                        "Static partition syntax PARTITION (col='val', ...) is 
only supported in INSERT statements",
+                        ctx);
             } else {
                 partitions = visitIdentifierList(ctx.partitions);
             }
@@ -1453,6 +1457,52 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
         return Pair.of(temporaryPartition, partitions);
     }
 
+    /**
+     * Parse partition specification for INSERT statements.
+     * Returns a unified InsertPartitionSpec that handles all partition modes:
+     * - Auto-detect: PARTITION (*)
+     * - Dynamic partition by name: PARTITION (p1, p2)
+     * - Static partition with values: PARTITION (col='val', ...)
+     * - No partition specified
+     */
+    private InsertPartitionSpec parseInsertPartitionSpec(PartitionSpecContext 
ctx) {
+        if (ctx == null) {
+            return InsertPartitionSpec.none();
+        }
+
+        boolean isTemporary = ctx.TEMPORARY() != null;
+
+        // PARTITION (*)
+        if (ctx.ASTERISK() != null) {
+            return InsertPartitionSpec.autoDetect(isTemporary);
+        }
+
+        // PARTITION partition_name (single partition)
+        if (ctx.partition != null) {
+            return InsertPartitionSpec.dynamicPartition(
+                    ImmutableList.of(ctx.partition.getText()), isTemporary);
+        }
+
+        // PARTITION (col1='val1', col2='val2') - static partition
+        if (ctx.partitionKeyValue() != null && 
!ctx.partitionKeyValue().isEmpty()) {
+            Map<String, Expression> staticValues = Maps.newLinkedHashMap();
+            for (DorisParser.PartitionKeyValueContext kvCtx : 
ctx.partitionKeyValue()) {
+                String colName = kvCtx.identifier().getText();
+                Expression valueExpr = typedVisit(kvCtx.expression());
+                staticValues.put(colName, valueExpr);
+            }
+            return InsertPartitionSpec.staticPartition(staticValues, 
isTemporary);
+        }
+
+        // PARTITION (p1, p2, ...) - dynamic partition list
+        if (ctx.partitions != null) {
+            List<String> partitionNames = visitIdentifierList(ctx.partitions);
+            return InsertPartitionSpec.dynamicPartition(partitionNames, 
isTemporary);
+        }
+
+        return InsertPartitionSpec.none();
+    }
+
     @Override
     public Command visitCreateMTMV(CreateMTMVContext ctx) {
         if (ctx.buildMode() == null && ctx.refreshMethod() == null && 
ctx.refreshTrigger() == null
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
index df6dec2db43..8cf2e105816 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
@@ -102,12 +102,17 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -603,9 +608,27 @@ public class BindSink implements AnalysisRuleFactory {
         IcebergExternalTable table = pair.second;
         LogicalPlan child = ((LogicalPlan) sink.child());
 
+        // Get static partition columns if present
+        Map<String, Expression> staticPartitions = 
sink.getStaticPartitionKeyValues();
+        Set<String> staticPartitionColNames = staticPartitions != null
+                ? staticPartitions.keySet()
+                : Sets.newHashSet();
+
+        // Validate static partition if present
+        if (sink.hasStaticPartition()) {
+            validateStaticPartition(sink, table);
+        }
+
+        // Build bindColumns: exclude static partition columns from the 
columns that
+        // need to come from SELECT
+        // Because static partition column values come from PARTITION clause, 
not from
+        // SELECT
         List<Column> bindColumns;
         if (sink.getColNames().isEmpty()) {
-            bindColumns = 
table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList());
+            // When no column names specified, include all 
non-static-partition columns
+            bindColumns = table.getBaseSchema(true).stream()
+                    .filter(col -> 
!staticPartitionColNames.contains(col.getName()))
+                    .collect(ImmutableList.toImmutableList());
         } else {
             bindColumns = sink.getColNames().stream().map(cn -> {
                 Column column = table.getColumn(cn);
@@ -616,6 +639,7 @@ public class BindSink implements AnalysisRuleFactory {
                 return column;
             }).collect(ImmutableList.toImmutableList());
         }
+
         LogicalIcebergTableSink<?> boundSink = new LogicalIcebergTableSink<>(
                 database,
                 table,
@@ -627,16 +651,92 @@ public class BindSink implements AnalysisRuleFactory {
                 Optional.empty(),
                 Optional.empty(),
                 child);
-        // we need to insert all the columns of the target table
+
+        // Check column count: SELECT columns should match bindColumns 
(excluding static
+        // partition columns)
         if (boundSink.getCols().size() != child.getOutput().size()) {
-            throw new AnalysisException("insert into cols should be 
corresponding to the query output");
+            throw new AnalysisException("insert into cols should be 
corresponding to the query output. "
+                    + "Expected " + boundSink.getCols().size() + " columns but 
got " + child.getOutput().size());
         }
+
         Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx, 
table, false,
                 boundSink, child);
+
+        // For static partition columns, add constant expressions from 
PARTITION clause
+        // This ensures partition column values are written to the data file
+        if (!staticPartitionColNames.isEmpty()) {
+            for (Map.Entry<String, Expression> entry : 
staticPartitions.entrySet()) {
+                String colName = entry.getKey();
+                Expression valueExpr = entry.getValue();
+                Column column = table.getColumn(colName);
+                if (column != null) {
+                    // Cast the literal to the correct column type
+                    Expression castExpr = TypeCoercionUtils.castIfNotSameType(
+                            valueExpr, 
DataType.fromCatalogType(column.getType()));
+                    columnToOutput.put(colName, new Alias(castExpr, colName));
+                }
+            }
+        }
+
         LogicalProject<?> fullOutputProject = 
getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput);
         return boundSink.withChildAndUpdateOutput(fullOutputProject);
     }
 
+    /**
+     * Validate static partition specification for Iceberg table
+     */
+    private void validateStaticPartition(UnboundIcebergTableSink<?> sink, 
IcebergExternalTable table) {
+        Map<String, Expression> staticPartitions = 
sink.getStaticPartitionKeyValues();
+        if (staticPartitions == null || staticPartitions.isEmpty()) {
+            return;
+        }
+
+        Table icebergTable = table.getIcebergTable();
+        PartitionSpec partitionSpec = icebergTable.spec();
+
+        // Check if table is partitioned
+        if (!partitionSpec.isPartitioned()) {
+            throw new AnalysisException(
+                    String.format("Table %s is not partitioned, cannot use 
static partition syntax", table.getName()));
+        }
+
+        // Get partition field names
+        Map<String, PartitionField> partitionFieldMap = Maps.newHashMap();
+        for (PartitionField field : partitionSpec.fields()) {
+            String fieldName = field.name();
+            partitionFieldMap.put(fieldName, field);
+        }
+
+        // Validate each static partition column
+        for (Map.Entry<String, Expression> entry : 
staticPartitions.entrySet()) {
+            String partitionColName = entry.getKey();
+            Expression partitionValue = entry.getValue();
+
+            // 1. Check if partition column exists
+            if (!partitionFieldMap.containsKey(partitionColName)) {
+                throw new AnalysisException(
+                        String.format("Unknown partition column '%s' in table 
'%s'. Available partition columns: %s",
+                                partitionColName, table.getName(), 
partitionFieldMap.keySet()));
+            }
+
+            // 2. Check if it's an identity partition.
+            // Static partition overwrite is only supported for identity 
partitions.
+            PartitionField field = partitionFieldMap.get(partitionColName);
+            if (!field.transform().isIdentity()) {
+                throw new AnalysisException(
+                        String.format("Cannot use static partition syntax for 
non-identity partition field '%s'"
+                                + " (transform: %s).", partitionColName, 
field.transform().toString()));
+            }
+
+            // 3. Validate partition value type must be a literal
+            if (!(partitionValue instanceof Literal)) {
+                throw new AnalysisException(
+                        String.format("Partition value for column '%s' must be 
a literal, but got: %s",
+                                partitionColName, partitionValue));
+            }
+        }
+    }
+
     private Plan bindJdbcTableSink(MatchingContext<UnboundJdbcTableSink<Plan>> 
ctx) {
         UnboundJdbcTableSink<?> sink = ctx.root;
         Pair<JdbcExternalDatabase, JdbcExternalTable> pair = 
bind(ctx.cascadesContext, sink);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
index 6a921cee10c..45bcb5092c8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
@@ -17,6 +17,9 @@
 
 package org.apache.doris.nereids.trees.plans.commands.insert;
 
+import com.google.common.collect.Maps;
+
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -24,6 +27,9 @@ import java.util.Optional;
  */
 public class IcebergInsertCommandContext extends 
BaseExternalTableInsertCommandContext {
     private Optional<String> branchName = Optional.empty();
+    // Static partition key-value pairs for INSERT OVERWRITE ... PARTITION
+    // (col='val', ...)
+    private Map<String, String> staticPartitionValues = Maps.newHashMap();
 
     public Optional<String> getBranchName() {
         return branchName;
@@ -32,4 +38,21 @@ public class IcebergInsertCommandContext extends 
BaseExternalTableInsertCommandC
     public void setBranchName(Optional<String> branchName) {
         this.branchName = branchName;
     }
+
+    public Map<String, String> getStaticPartitionValues() {
+        return staticPartitionValues;
+    }
+
+    public void setStaticPartitionValues(Map<String, String> 
staticPartitionValues) {
+        this.staticPartitionValues = staticPartitionValues != null
+                ? Maps.newHashMap(staticPartitionValues)
+                : Maps.newHashMap();
+    }
+
+    /**
+     * Check if this is a static partition overwrite
+     */
+    public boolean isStaticPartitionOverwrite() {
+        return isOverwrite() && !staticPartitionValues.isEmpty();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
index b8b20f61527..e0ba01277b8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
@@ -43,6 +43,8 @@ import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
 import org.apache.doris.nereids.properties.PhysicalProperties;
 import org.apache.doris.nereids.trees.TreeNode;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
 import org.apache.doris.nereids.trees.plans.Explainable;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
@@ -65,6 +67,7 @@ import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -72,6 +75,7 @@ import org.awaitility.Awaitility;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -366,9 +370,11 @@ public class InsertOverwriteTableCommand extends Command 
implements NeedAuditEnc
                     false,
                     TPartialUpdateNewRowPolicy.APPEND,
                     sink.getDMLCommandType(),
-                    (LogicalPlan) (sink.child(0)));
+                    (LogicalPlan) (sink.child(0)),
+                    sink.getStaticPartitionKeyValues());
             insertCtx = new IcebergInsertCommandContext();
             ((IcebergInsertCommandContext) insertCtx).setOverwrite(true);
+            setStaticPartitionToContext(sink, (IcebergInsertCommandContext) 
insertCtx);
             branchName.ifPresent(notUsed -> ((IcebergInsertCommandContext) 
insertCtx).setBranchName(branchName));
         } else {
             throw new UserException("Current catalog does not support insert 
overwrite yet.");
@@ -394,16 +400,33 @@ public class InsertOverwriteTableCommand extends Command 
implements NeedAuditEnc
         } else if (logicalQuery instanceof UnboundHiveTableSink) {
             insertCtx = new HiveInsertCommandContext();
             ((HiveInsertCommandContext) insertCtx).setOverwrite(true);
-        } else if (logicalQuery instanceof UnboundIcebergTableSink) {
-            insertCtx = new IcebergInsertCommandContext();
-            ((IcebergInsertCommandContext) insertCtx).setOverwrite(true);
-            branchName.ifPresent(notUsed -> ((IcebergInsertCommandContext) 
insertCtx).setBranchName(branchName));
         } else {
-            throw new UserException("Current catalog does not support insert 
overwrite yet.");
+            throw new UserException("Current catalog does not support insert 
overwrite with auto-detect partition.");
         }
         runInsertCommand(logicalQuery, insertCtx, ctx, executor);
     }
 
+    /**
+     * Extract static partition information from sink and set to context.
+     */
+    private void setStaticPartitionToContext(UnboundIcebergTableSink<?> sink,
+            IcebergInsertCommandContext insertCtx) {
+        if (sink.hasStaticPartition()) {
+            Map<String, Expression> staticPartitions = 
sink.getStaticPartitionKeyValues();
+            Map<String, String> staticPartitionValues = Maps.newHashMap();
+            for (Map.Entry<String, Expression> entry : 
staticPartitions.entrySet()) {
+                Expression expr = entry.getValue();
+                if (expr instanceof Literal) {
+                    staticPartitionValues.put(entry.getKey(), ((Literal) 
expr).getStringValue());
+                } else {
+                    throw new AnalysisException(
+                            String.format("Static partition value must be a 
literal, but got: %s", expr));
+                }
+            }
+            insertCtx.setStaticPartitionValues(staticPartitionValues);
+        }
+    }
+
     @Override
     public Plan getExplainPlan(ConnectContext ctx) {
         Optional<CascadesContext> analyzeContext = Optional.of(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index c536844c649..3968ada89ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -161,6 +161,14 @@ public class IcebergTableSink extends 
BaseExternalTableDataSink {
         if (insertCtx.isPresent()) {
             IcebergInsertCommandContext context = 
(IcebergInsertCommandContext) insertCtx.get();
             tSink.setOverwrite(context.isOverwrite());
+
+            // Pass static partition values to BE for static partition 
overwrite
+            if (context.isStaticPartitionOverwrite()) {
+                Map<String, String> staticPartitionValues = 
context.getStaticPartitionValues();
+                if (staticPartitionValues != null && 
!staticPartitionValues.isEmpty()) {
+                    tSink.setStaticPartitionValues(staticPartitionValues);
+                }
+            }
         }
         tDataSink = new TDataSink(TDataSinkType.ICEBERG_TABLE_SINK);
         tDataSink.setIcebergTableSink(tSink);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/InsertPartitionSpecTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/InsertPartitionSpecTest.java
new file mode 100644
index 00000000000..a6f3402cade
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/InsertPartitionSpecTest.java
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.parser;
+
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Unit tests for {@link InsertPartitionSpec}.
+ */
+public class InsertPartitionSpecTest {
+
+    @Test
+    public void testNone() {
+        InsertPartitionSpec spec = InsertPartitionSpec.none();
+
+        Assertions.assertFalse(spec.isStaticPartition());
+        Assertions.assertFalse(spec.hasDynamicPartitionNames());
+        Assertions.assertFalse(spec.isTemporary());
+        Assertions.assertFalse(spec.isAutoDetect());
+        Assertions.assertTrue(spec.getStaticPartitionValues().isEmpty());
+        Assertions.assertTrue(spec.getPartitionNames().isEmpty());
+    }
+
+    @Test
+    public void testAutoDetect() {
+        // Non-temporary auto-detect
+        InsertPartitionSpec spec = InsertPartitionSpec.autoDetect(false);
+
+        Assertions.assertFalse(spec.isStaticPartition());
+        Assertions.assertFalse(spec.hasDynamicPartitionNames());
+        Assertions.assertFalse(spec.isTemporary());
+        Assertions.assertTrue(spec.isAutoDetect());
+        Assertions.assertTrue(spec.getStaticPartitionValues().isEmpty());
+        Assertions.assertTrue(spec.getPartitionNames().isEmpty());
+    }
+
+    @Test
+    public void testAutoDetectTemporary() {
+        // Temporary auto-detect
+        InsertPartitionSpec spec = InsertPartitionSpec.autoDetect(true);
+
+        Assertions.assertFalse(spec.isStaticPartition());
+        Assertions.assertFalse(spec.hasDynamicPartitionNames());
+        Assertions.assertTrue(spec.isTemporary());
+        Assertions.assertTrue(spec.isAutoDetect());
+    }
+
+    @Test
+    public void testDynamicPartition() {
+        List<String> partitionNames = ImmutableList.of("p1", "p2", "p3");
+        InsertPartitionSpec spec = 
InsertPartitionSpec.dynamicPartition(partitionNames, false);
+
+        Assertions.assertFalse(spec.isStaticPartition());
+        Assertions.assertTrue(spec.hasDynamicPartitionNames());
+        Assertions.assertFalse(spec.isTemporary());
+        Assertions.assertFalse(spec.isAutoDetect());
+        Assertions.assertEquals(3, spec.getPartitionNames().size());
+        Assertions.assertEquals("p1", spec.getPartitionNames().get(0));
+        Assertions.assertEquals("p2", spec.getPartitionNames().get(1));
+        Assertions.assertEquals("p3", spec.getPartitionNames().get(2));
+    }
+
+    @Test
+    public void testDynamicPartitionTemporary() {
+        List<String> partitionNames = ImmutableList.of("p1");
+        InsertPartitionSpec spec = 
InsertPartitionSpec.dynamicPartition(partitionNames, true);
+
+        Assertions.assertFalse(spec.isStaticPartition());
+        Assertions.assertTrue(spec.hasDynamicPartitionNames());
+        Assertions.assertTrue(spec.isTemporary());
+        Assertions.assertFalse(spec.isAutoDetect());
+        Assertions.assertEquals(1, spec.getPartitionNames().size());
+    }
+
+    @Test
+    public void testDynamicPartitionEmpty() {
+        List<String> partitionNames = ImmutableList.of();
+        InsertPartitionSpec spec = 
InsertPartitionSpec.dynamicPartition(partitionNames, false);
+
+        Assertions.assertFalse(spec.isStaticPartition());
+        Assertions.assertFalse(spec.hasDynamicPartitionNames());
+        Assertions.assertFalse(spec.isTemporary());
+        Assertions.assertFalse(spec.isAutoDetect());
+        Assertions.assertTrue(spec.getPartitionNames().isEmpty());
+    }
+
+    @Test
+    public void testDynamicPartitionNull() {
+        InsertPartitionSpec spec = InsertPartitionSpec.dynamicPartition(null, 
false);
+
+        Assertions.assertFalse(spec.isStaticPartition());
+        Assertions.assertFalse(spec.hasDynamicPartitionNames());
+        Assertions.assertTrue(spec.getPartitionNames().isEmpty());
+    }
+
+    @Test
+    public void testStaticPartition() {
+        Map<String, Expression> staticValues = ImmutableMap.of(
+                "dt", new StringLiteral("2025-01-25"),
+                "region", new StringLiteral("bj"));
+        InsertPartitionSpec spec = 
InsertPartitionSpec.staticPartition(staticValues, false);
+
+        Assertions.assertTrue(spec.isStaticPartition());
+        Assertions.assertFalse(spec.hasDynamicPartitionNames());
+        Assertions.assertFalse(spec.isTemporary());
+        Assertions.assertFalse(spec.isAutoDetect());
+        Assertions.assertEquals(2, spec.getStaticPartitionValues().size());
+        
Assertions.assertTrue(spec.getStaticPartitionValues().containsKey("dt"));
+        
Assertions.assertTrue(spec.getStaticPartitionValues().containsKey("region"));
+    }
+
+    @Test
+    public void testStaticPartitionTemporary() {
+        Map<String, Expression> staticValues = ImmutableMap.of(
+                "year", new IntegerLiteral(2025));
+        InsertPartitionSpec spec = 
InsertPartitionSpec.staticPartition(staticValues, true);
+
+        Assertions.assertTrue(spec.isStaticPartition());
+        Assertions.assertFalse(spec.hasDynamicPartitionNames());
+        Assertions.assertTrue(spec.isTemporary());
+        Assertions.assertFalse(spec.isAutoDetect());
+        Assertions.assertEquals(1, spec.getStaticPartitionValues().size());
+    }
+
+    @Test
+    public void testStaticPartitionEmpty() {
+        Map<String, Expression> staticValues = ImmutableMap.of();
+        InsertPartitionSpec spec = 
InsertPartitionSpec.staticPartition(staticValues, false);
+
+        Assertions.assertFalse(spec.isStaticPartition());
+        Assertions.assertFalse(spec.hasDynamicPartitionNames());
+        Assertions.assertTrue(spec.getStaticPartitionValues().isEmpty());
+    }
+
+    @Test
+    public void testStaticPartitionNull() {
+        InsertPartitionSpec spec = InsertPartitionSpec.staticPartition(null, 
false);
+
+        Assertions.assertFalse(spec.isStaticPartition());
+        Assertions.assertTrue(spec.getStaticPartitionValues().isEmpty());
+    }
+
+    @Test
+    public void testImmutability() {
+        // Test that returned collections are immutable
+        Map<String, Expression> staticValues = ImmutableMap.of(
+                "dt", new StringLiteral("2025-01-25"));
+        List<String> partitionNames = ImmutableList.of("p1");
+
+        InsertPartitionSpec staticSpec = 
InsertPartitionSpec.staticPartition(staticValues, false);
+        InsertPartitionSpec dynamicSpec = 
InsertPartitionSpec.dynamicPartition(partitionNames, false);
+
+        // Verify collections are immutable (should throw 
UnsupportedOperationException)
+        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
+            staticSpec.getStaticPartitionValues().put("new_key", new 
StringLiteral("value"));
+        });
+
+        Assertions.assertThrows(UnsupportedOperationException.class, () -> {
+            dynamicSpec.getPartitionNames().add("p2");
+        });
+    }
+
+    @Test
+    public void testMutualExclusivity() {
+        // Static partition should not have dynamic partition names
+        Map<String, Expression> staticValues = ImmutableMap.of(
+                "dt", new StringLiteral("2025-01-25"));
+        InsertPartitionSpec staticSpec = 
InsertPartitionSpec.staticPartition(staticValues, false);
+        Assertions.assertTrue(staticSpec.isStaticPartition());
+        Assertions.assertFalse(staticSpec.hasDynamicPartitionNames());
+        Assertions.assertFalse(staticSpec.isAutoDetect());
+
+        // Dynamic partition should not be static
+        List<String> partitionNames = ImmutableList.of("p1", "p2");
+        InsertPartitionSpec dynamicSpec = 
InsertPartitionSpec.dynamicPartition(partitionNames, false);
+        Assertions.assertFalse(dynamicSpec.isStaticPartition());
+        Assertions.assertTrue(dynamicSpec.hasDynamicPartitionNames());
+        Assertions.assertFalse(dynamicSpec.isAutoDetect());
+
+        // Auto-detect should not be static or have dynamic names
+        InsertPartitionSpec autoSpec = InsertPartitionSpec.autoDetect(false);
+        Assertions.assertFalse(autoSpec.isStaticPartition());
+        Assertions.assertFalse(autoSpec.hasDynamicPartitionNames());
+        Assertions.assertTrue(autoSpec.isAutoDetect());
+
+        // None should have nothing
+        InsertPartitionSpec noneSpec = InsertPartitionSpec.none();
+        Assertions.assertFalse(noneSpec.isStaticPartition());
+        Assertions.assertFalse(noneSpec.hasDynamicPartitionNames());
+        Assertions.assertFalse(noneSpec.isAutoDetect());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/ParseInsertPartitionSpecTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/ParseInsertPartitionSpecTest.java
new file mode 100644
index 00000000000..185141d6875
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/ParseInsertPartitionSpecTest.java
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.parser;
+
+import org.apache.doris.nereids.DorisParser;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.literal.StringLikeLiteral;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Maps;
+import mockit.Mock;
+import mockit.MockUp;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+
+/**
+ * Unit tests for LogicalPlanBuilder.parseInsertPartitionSpec() method.
+ */
+public class ParseInsertPartitionSpecTest {
+
+    private static LogicalPlanBuilder logicalPlanBuilder;
+
+    @BeforeAll
+    public static void init() {
+        ConnectContext ctx = new ConnectContext();
+        new MockUp<ConnectContext>() {
+            @Mock
+            public ConnectContext get() {
+                return ctx;
+            }
+        };
+    }
+
+    /**
+     * Helper method to invoke the private parseInsertPartitionSpec method via
+     * reflection.
+     * Uses reflection to find the method with PartitionSpecContext parameter.
+     */
+    private InsertPartitionSpec invokeParseInsertPartitionSpec(Object ctx) 
throws Exception {
+        logicalPlanBuilder = new LogicalPlanBuilder(Maps.newHashMap());
+        // Find the parseInsertPartitionSpec method - it takes 
PartitionSpecContext as
+        // parameter
+        Method targetMethod = null;
+        for (Method method : LogicalPlanBuilder.class.getDeclaredMethods()) {
+            if (method.getName().equals("parseInsertPartitionSpec")) {
+                targetMethod = method;
+                break;
+            }
+        }
+        if (targetMethod == null) {
+            throw new NoSuchMethodException("parseInsertPartitionSpec method 
not found");
+        }
+        targetMethod.setAccessible(true);
+        return (InsertPartitionSpec) targetMethod.invoke(logicalPlanBuilder, 
ctx);
+    }
+
+    /**
+     * Helper method to parse SQL and extract PartitionSpecContext using 
reflection.
+     */
+    private Object parsePartitionSpec(String insertSql) throws Exception {
+        // Use NereidsParser.toAst() to parse the SQL and get the AST
+        ParserRuleContext tree = NereidsParser.toAst(
+                insertSql, DorisParser::singleStatement);
+
+        // The tree is a SingleStatementContext, which contains a 
StatementContext
+        // which contains a StatementBaseContext which contains an 
InsertTableContext
+        // Use reflection to navigate the AST structure
+        Method getChildMethod = ParserRuleContext.class.getMethod("getChild", 
int.class);
+
+        // Get statement from singleStatement (index 0)
+        Object statement = getChildMethod.invoke(tree, 0);
+
+        // Get statementBase from statement (index 0)
+        Object statementBase = getChildMethod.invoke(statement, 0);
+
+        // Get insertTable from statementBase (index 0)
+        Object insertTableCtx = getChildMethod.invoke(statementBase, 0);
+
+        // Get partitionSpec() from insertTableCtx using the method
+        Method partitionSpecMethod = 
insertTableCtx.getClass().getMethod("partitionSpec");
+        return partitionSpecMethod.invoke(insertTableCtx);
+    }
+
+    @Test
+    public void testParseNullContext() throws Exception {
+        InsertPartitionSpec spec = invokeParseInsertPartitionSpec(null);
+
+        Assertions.assertFalse(spec.isStaticPartition());
+        Assertions.assertFalse(spec.hasDynamicPartitionNames());
+        Assertions.assertFalse(spec.isTemporary());
+        Assertions.assertFalse(spec.isAutoDetect());
+    }
+
+    @Test
+    public void testParseAutoDetect() throws Exception {
+        // Parse: INSERT INTO tbl PARTITION (*) SELECT ...
+        String sql = "INSERT INTO tbl PARTITION (*) SELECT * FROM src";
+        Object ctx = parsePartitionSpec(sql);
+
+        InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+        Assertions.assertFalse(spec.isStaticPartition());
+        Assertions.assertFalse(spec.hasDynamicPartitionNames());
+        Assertions.assertFalse(spec.isTemporary());
+        Assertions.assertTrue(spec.isAutoDetect());
+    }
+
+    @Test
+    public void testParseDynamicPartitionSingle() throws Exception {
+        // Parse: INSERT INTO tbl PARTITION p1 SELECT ...
+        String sql = "INSERT INTO tbl PARTITION p1 SELECT * FROM src";
+        Object ctx = parsePartitionSpec(sql);
+
+        InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+        Assertions.assertFalse(spec.isStaticPartition());
+        Assertions.assertTrue(spec.hasDynamicPartitionNames());
+        Assertions.assertFalse(spec.isTemporary());
+        Assertions.assertFalse(spec.isAutoDetect());
+        Assertions.assertEquals(1, spec.getPartitionNames().size());
+        Assertions.assertEquals("p1", spec.getPartitionNames().get(0));
+    }
+
+    @Test
+    public void testParseDynamicPartitionList() throws Exception {
+        // Parse: INSERT INTO tbl PARTITION (p1, p2, p3) SELECT ...
+        String sql = "INSERT INTO tbl PARTITION (p1, p2, p3) SELECT * FROM 
src";
+        Object ctx = parsePartitionSpec(sql);
+
+        InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+        Assertions.assertFalse(spec.isStaticPartition());
+        Assertions.assertTrue(spec.hasDynamicPartitionNames());
+        Assertions.assertFalse(spec.isTemporary());
+        Assertions.assertFalse(spec.isAutoDetect());
+        Assertions.assertEquals(3, spec.getPartitionNames().size());
+        Assertions.assertTrue(spec.getPartitionNames().contains("p1"));
+        Assertions.assertTrue(spec.getPartitionNames().contains("p2"));
+        Assertions.assertTrue(spec.getPartitionNames().contains("p3"));
+    }
+
+    @Test
+    public void testParseTemporaryPartition() throws Exception {
+        // Parse: INSERT INTO tbl TEMPORARY PARTITION (p1) SELECT ...
+        String sql = "INSERT INTO tbl TEMPORARY PARTITION (p1) SELECT * FROM 
src";
+        Object ctx = parsePartitionSpec(sql);
+
+        InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+        Assertions.assertFalse(spec.isStaticPartition());
+        Assertions.assertTrue(spec.hasDynamicPartitionNames());
+        Assertions.assertTrue(spec.isTemporary());
+        Assertions.assertFalse(spec.isAutoDetect());
+    }
+
+    @Test
+    public void testParseStaticPartitionStringValue() throws Exception {
+        // Parse: INSERT OVERWRITE TABLE tbl PARTITION (dt='2025-01-25') 
SELECT ...
+        String sql = "INSERT OVERWRITE TABLE tbl PARTITION (dt='2025-01-25') 
SELECT * FROM src";
+        Object ctx = parsePartitionSpec(sql);
+
+        InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+        Assertions.assertTrue(spec.isStaticPartition());
+        Assertions.assertFalse(spec.hasDynamicPartitionNames());
+        Assertions.assertFalse(spec.isTemporary());
+        Assertions.assertFalse(spec.isAutoDetect());
+
+        Map<String, Expression> staticValues = spec.getStaticPartitionValues();
+        Assertions.assertEquals(1, staticValues.size());
+        Assertions.assertTrue(staticValues.containsKey("dt"));
+        Assertions.assertTrue(staticValues.get("dt") instanceof 
StringLikeLiteral);
+        Assertions.assertEquals("2025-01-25", ((StringLikeLiteral) 
staticValues.get("dt")).getStringValue());
+    }
+
+    @Test
+    public void testParseStaticPartitionMultipleValues() throws Exception {
+        // Parse: INSERT OVERWRITE TABLE tbl PARTITION (dt='2025-01-25', 
region='bj') SELECT
+        // ...
+        String sql = "INSERT OVERWRITE TABLE tbl PARTITION (dt='2025-01-25', 
region='bj') SELECT * FROM src";
+        Object ctx = parsePartitionSpec(sql);
+
+        InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+        Assertions.assertTrue(spec.isStaticPartition());
+        Assertions.assertFalse(spec.hasDynamicPartitionNames());
+
+        Map<String, Expression> staticValues = spec.getStaticPartitionValues();
+        Assertions.assertEquals(2, staticValues.size());
+        Assertions.assertTrue(staticValues.containsKey("dt"));
+        Assertions.assertTrue(staticValues.containsKey("region"));
+        Assertions.assertEquals("2025-01-25", ((StringLikeLiteral) 
staticValues.get("dt")).getStringValue());
+        Assertions.assertEquals("bj", ((StringLikeLiteral) 
staticValues.get("region")).getStringValue());
+    }
+
+    @Test
+    public void testParseStaticPartitionIntegerValue() throws Exception {
+        // Parse: INSERT OVERWRITE TABLE tbl PARTITION (year=2025) SELECT ...
+        String sql = "INSERT OVERWRITE TABLE tbl PARTITION (year=2025) SELECT 
* FROM src";
+        Object ctx = parsePartitionSpec(sql);
+
+        InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+        Assertions.assertTrue(spec.isStaticPartition());
+
+        Map<String, Expression> staticValues = spec.getStaticPartitionValues();
+        Assertions.assertEquals(1, staticValues.size());
+        Assertions.assertTrue(staticValues.containsKey("year"));
+        // Note: The parser may parse integers as IntegerLikeLiteral
+        Expression yearExpr = staticValues.get("year");
+        Assertions.assertNotNull(yearExpr);
+    }
+
+    @Test
+    public void testParseNoPartition() throws Exception {
+        // Parse: INSERT INTO tbl SELECT ... (no partition clause)
+        String sql = "INSERT INTO tbl SELECT * FROM src";
+        Object ctx = parsePartitionSpec(sql);
+
+        InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+        // ctx should be null for no partition
+        Assertions.assertFalse(spec.isStaticPartition());
+        Assertions.assertFalse(spec.hasDynamicPartitionNames());
+        Assertions.assertFalse(spec.isTemporary());
+        Assertions.assertFalse(spec.isAutoDetect());
+    }
+
+    @Test
+    public void testParseStaticPartitionMixedTypes() throws Exception {
+        // Parse: INSERT OVERWRITE TABLE tbl PARTITION (year=2025, month='01', 
day=25) SELECT
+        // ...
+        String sql = "INSERT OVERWRITE TABLE tbl PARTITION (year=2025, 
month='01', day=25) SELECT * FROM src";
+        Object ctx = parsePartitionSpec(sql);
+
+        InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+        Assertions.assertTrue(spec.isStaticPartition());
+        Assertions.assertFalse(spec.hasDynamicPartitionNames());
+
+        Map<String, Expression> staticValues = spec.getStaticPartitionValues();
+        Assertions.assertEquals(3, staticValues.size());
+        Assertions.assertTrue(staticValues.containsKey("year"));
+        Assertions.assertTrue(staticValues.containsKey("month"));
+        Assertions.assertTrue(staticValues.containsKey("day"));
+
+        // month should be string
+        Assertions.assertTrue(staticValues.get("month") instanceof 
StringLikeLiteral);
+        Assertions.assertEquals("01", ((StringLikeLiteral) 
staticValues.get("month")).getStringValue());
+    }
+}
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index d218db5dba5..f151c58917b 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -426,6 +426,10 @@ struct TIcebergTableSink {
     12: optional string original_output_path
     13: optional PlanNodes.TFileCompressType compression_type
     14: optional list<Types.TNetworkAddress> broker_addresses;
+    // Static partition values for static partition overwrite
+    // Key: partition column name, Value: partition value as string
+    // When set, BE should use these values directly instead of computing from 
data
+    15: optional map<string, string> static_partition_values;
 }
 
 enum TDictLayoutType {
diff --git 
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_static_partition_overwrite.out
 
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_static_partition_overwrite.out
new file mode 100644
index 00000000000..58cd1fa5ca4
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_static_partition_overwrite.out
@@ -0,0 +1,135 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q01 --
+1      Alice   2025-01-25      bj
+2      Bob     2025-01-25      sh
+3      Charlie 2025-01-26      bj
+4      David   2025-01-26      sh
+
+-- !q02 --
+10     Eve     2025-01-25      bj
+2      Bob     2025-01-25      sh
+3      Charlie 2025-01-26      bj
+4      David   2025-01-26      sh
+
+-- !q03_before --
+1      Alice   2025-01-25      bj
+2      Bob     2025-01-25      sh
+3      Charlie 2025-01-26      bj
+4      David   2025-01-26      sh
+
+-- !q03_after --
+10     Eve     2025-01-25      bj
+11     Frank   2025-01-25      sh
+12     Grace   2025-01-25      gz
+3      Charlie 2025-01-26      bj
+4      David   2025-01-26      sh
+
+-- !q03_partition_25 --
+10     Eve     2025-01-25      bj
+11     Frank   2025-01-25      sh
+12     Grace   2025-01-25      gz
+
+-- !q03_partition_26 --
+3      Charlie 2025-01-26      bj
+4      David   2025-01-26      sh
+
+-- !q04 --
+2      Bob     2025-01-25      sh
+3      Charlie 2025-01-26      bj
+
+-- !q05 --
+10     Eve     2025-01-25      bj      100
+2      Bob     2025-01-25      bj      200
+3      Charlie 2025-01-25      sh      100
+4      David   2025-01-26      bj      100
+
+-- !q06_before --
+1      Alice   2025-01-25      bj      food
+2      Bob     2025-01-25      bj      drink
+3      Charlie 2025-01-25      sh      food
+4      David   2025-01-26      bj      food
+
+-- !q06_after --
+10     Eve     2025-01-25      bj      electronics
+11     Frank   2025-01-25      bj      clothing
+3      Charlie 2025-01-25      sh      food
+4      David   2025-01-26      bj      food
+
+-- !q06_static_partition --
+10     Eve     2025-01-25      bj      electronics
+11     Frank   2025-01-25      bj      clothing
+
+-- !q06_other_partitions --
+3      Charlie 2025-01-25      sh      food
+4      David   2025-01-26      bj      food
+
+-- !q07 --
+10     Eve     1706140800000
+2      Bob     1706227200000
+3      Charlie 1706313600000
+
+-- !q08 --
+10     Eve     1706140800000   bj
+11     Frank   1706140800000   sh
+3      Charlie 1706227200000   bj
+
+-- !q09 --
+10     Eve     85.5
+2      Bob     90.0
+3      Charlie 75.5
+
+-- !q10 --
+10     Eve     85.5    1
+11     Frank   85.5    2
+3      Charlie 90.0    1
+
+-- !q11 --
+10     Eve     99.98999999999999
+2      Bob     199.99
+3      Charlie 299.99
+
+-- !q12 --
+10     Eve     99.98999999999999       A
+11     Frank   99.98999999999999       B
+3      Charlie 199.99  A
+
+-- !q13 --
+10     Eve     true
+2      Bob     false
+
+-- !q14 --
+10     Eve     true    1
+11     Frank   true    2
+3      Charlie false   1
+
+-- !q15 --
+10     Eve     2025-01-25T10:00
+2      Bob     2025-01-25T11:00
+3      Charlie 2025-01-26T10:00
+
+-- !q16 --
+10     Eve     2025-01-25T10:00        bj
+11     Frank   2025-01-25T10:00        sh
+3      Charlie 2025-01-26T10:00        bj
+
+-- !q17 --
+10     Eve     100.50
+2      Bob     200.75
+3      Charlie 300.25
+
+-- !q18 --
+10     Eve     100.50  10
+11     Frank   100.50  20
+3      Charlie 200.75  10
+
+-- !q19 --
+10     Eve     1       85.5    true    A
+2      Bob     1       85.5    true    B
+3      Charlie 2       90.0    false   A
+4      David   1       85.5    false   A
+
+-- !q20 --
+10     Eve     1       85.5    true    A
+11     Frank   1       85.5    false   B
+4      David   2       90.0    true    A
+
diff --git 
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_static_partition_overwrite.groovy
 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_static_partition_overwrite.groovy
new file mode 100644
index 00000000000..d79093b69bf
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_static_partition_overwrite.groovy
@@ -0,0 +1,1086 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_static_partition_overwrite", 
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String catalog_name = "test_iceberg_static_partition_overwrite"
+
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    sql """ switch ${catalog_name} """
+
+    String db1 = catalog_name + "_db"
+    String tb1 = db1 + "_tb1"
+
+    sql """ drop database if exists ${db1} force"""
+    sql """ create database ${db1} """
+    sql """ use ${db1} """
+
+    // Test Case 1: Full static partition overwrite (all partition columns 
specified)
+    // Test overwriting a specific partition with all partition columns 
specified
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            dt DATE,
+            region STRING
+        ) ENGINE=iceberg
+        PARTITION BY LIST (dt, region) ()
+    """
+
+    // Insert initial data
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', '2025-01-25', 'bj'),
+        (2, 'Bob', '2025-01-25', 'sh'),
+        (3, 'Charlie', '2025-01-26', 'bj'),
+        (4, 'David', '2025-01-26', 'sh')
+    """
+
+    // Verify initial data
+    order_qt_q01 """ SELECT * FROM ${tb1} ORDER BY id """
+
+    // Overwrite specific partition (dt='2025-01-25', region='bj')
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (dt='2025-01-25', region='bj')
+        SELECT 10, 'Eve'
+    """
+
+    // Verify: Only (dt='2025-01-25', region='bj') partition is overwritten
+    // Other partitions remain unchanged
+    order_qt_q02 """ SELECT * FROM ${tb1} ORDER BY id """
+
+    // Test Case 2: Hybrid partition mode - partial static + partial dynamic
+    // Static partition column (dt) comes from PARTITION clause
+    // Dynamic partition column (region) comes from SELECT query result
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            dt DATE,
+            region STRING
+        ) ENGINE=iceberg
+        PARTITION BY LIST (dt, region) ()
+    """
+
+    // Create source table for hybrid partition test
+    String tb_src = db1 + "_src"
+    sql """ DROP TABLE IF EXISTS ${tb_src} """
+    sql """
+        CREATE TABLE ${tb_src} (
+            id BIGINT,
+            name STRING,
+            region STRING
+        ) ENGINE=iceberg
+    """
+
+    // Insert source data with different regions
+    sql """
+        INSERT INTO ${tb_src} VALUES
+        (10, 'Eve', 'bj'),
+        (11, 'Frank', 'sh'),
+        (12, 'Grace', 'gz')
+    """
+
+    // Insert initial data to target table
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', '2025-01-25', 'bj'),
+        (2, 'Bob', '2025-01-25', 'sh'),
+        (3, 'Charlie', '2025-01-26', 'bj'),
+        (4, 'David', '2025-01-26', 'sh')
+    """
+
+    // Verify initial data
+    order_qt_q03_before """ SELECT * FROM ${tb1} ORDER BY id """
+
+    // Hybrid mode: dt='2025-01-25' is static, region comes from source table 
dynamically
+    // This should:
+    // 1. Delete all data where dt='2025-01-25'
+    // 2. Insert new data with dt='2025-01-25' and region values from source 
table
+    // Note: SELECT does NOT include 'dt' column - it comes from PARTITION 
clause
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (dt='2025-01-25')
+        SELECT id, name, region FROM ${tb_src}
+    """
+
+    // Verify: 
+    // - Partitions with dt='2025-01-25' are replaced with new data (bj, sh, 
gz regions)
+    // - Partitions with dt='2025-01-26' remain unchanged
+    order_qt_q03_after """ SELECT * FROM ${tb1} ORDER BY id """
+
+    // Verify partition data distribution
+    order_qt_q03_partition_25 """ SELECT * FROM ${tb1} WHERE dt='2025-01-25' 
ORDER BY id """
+    order_qt_q03_partition_26 """ SELECT * FROM ${tb1} WHERE dt='2025-01-26' 
ORDER BY id """
+
+    sql """ DROP TABLE IF EXISTS ${tb_src} """
+
+    // Test Case 3: Empty result overwrite (delete specified partition)
+    // Test deleting a partition by overwriting with empty result
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            dt DATE,
+            region STRING
+        ) ENGINE=iceberg
+        PARTITION BY LIST (dt, region) ()
+    """
+
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', '2025-01-25', 'bj'),
+        (2, 'Bob', '2025-01-25', 'sh'),
+        (3, 'Charlie', '2025-01-26', 'bj')
+    """
+
+    // Overwrite with empty result to delete the specified partition
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (dt='2025-01-25', region='bj')
+        SELECT id, name FROM ${tb1} WHERE 1=0
+    """
+
+    // Verify: Specified partition is deleted, other partitions remain 
unchanged
+    order_qt_q04 """ SELECT * FROM ${tb1} ORDER BY id """
+
+    // Test Case 4: Error scenario - non-existent partition column
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            dt DATE,
+            region STRING
+        ) ENGINE=iceberg
+        PARTITION BY LIST (dt, region) ()
+    """
+
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (invalid_col='value')
+            SELECT * FROM ${tb1}
+        """
+        exception "Unknown partition column"
+    }
+
+    // Test Case 5: Multiple static partitions with different data types (full 
static)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            dt DATE,
+            region STRING,
+            amount INT
+        ) ENGINE=iceberg
+        PARTITION BY LIST (dt, region, amount) ()
+    """
+
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', '2025-01-25', 'bj', 100),
+        (2, 'Bob', '2025-01-25', 'bj', 200),
+        (3, 'Charlie', '2025-01-25', 'sh', 100),
+        (4, 'David', '2025-01-26', 'bj', 100)
+    """
+
+    // Overwrite partition with multiple partition columns including integer 
type
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (dt='2025-01-25', region='bj', amount=100)
+        SELECT 10, 'Eve'
+    """
+
+    // Verify: Only the exact matching partition is overwritten
+    order_qt_q05 """ SELECT * FROM ${tb1} ORDER BY id """
+
+    // Test Case 6: Hybrid mode with three partition columns (2 static + 1 
dynamic)
+    // Table has partition columns: (dt, region, category)
+    // Static: dt='2025-01-25', region='bj'
+    // Dynamic: category comes from SELECT
+    String tb2 = db1 + "_tb2"
+    sql """ DROP TABLE IF EXISTS ${tb2} """
+    sql """
+        CREATE TABLE ${tb2} (
+            id BIGINT,
+            name STRING,
+            dt DATE,
+            region STRING,
+            category STRING
+        ) ENGINE=iceberg
+        PARTITION BY LIST (dt, region, category) ()
+    """
+
+    // Insert initial data with different categories
+    sql """
+        INSERT INTO ${tb2} VALUES
+        (1, 'Alice', '2025-01-25', 'bj', 'food'),
+        (2, 'Bob', '2025-01-25', 'bj', 'drink'),
+        (3, 'Charlie', '2025-01-25', 'sh', 'food'),
+        (4, 'David', '2025-01-26', 'bj', 'food')
+    """
+
+    order_qt_q06_before """ SELECT * FROM ${tb2} ORDER BY id """
+
+    // Create source table for dynamic category values
+    String tb2_src = db1 + "_tb2_src"
+    sql """ DROP TABLE IF EXISTS ${tb2_src} """
+    sql """
+        CREATE TABLE ${tb2_src} (
+            id BIGINT,
+            name STRING,
+            category STRING
+        ) ENGINE=iceberg
+    """
+
+    sql """
+        INSERT INTO ${tb2_src} VALUES
+        (10, 'Eve', 'electronics'),
+        (11, 'Frank', 'clothing')
+    """
+
+    // Hybrid mode: dt and region are static, category is dynamic from source
+    // SELECT should only include: id, name, category (not dt, region)
+    sql """
+        INSERT OVERWRITE TABLE ${tb2} 
+        PARTITION (dt='2025-01-25', region='bj')
+        SELECT id, name, category FROM ${tb2_src}
+    """
+
+    // Verify:
+    // - All partitions with dt='2025-01-25' AND region='bj' are replaced
+    // - Other partitions remain unchanged
+    order_qt_q06_after """ SELECT * FROM ${tb2} ORDER BY id """
+
+    // Verify specific partition filters
+    order_qt_q06_static_partition """ SELECT * FROM ${tb2} WHERE 
dt='2025-01-25' AND region='bj' ORDER BY id """
+    order_qt_q06_other_partitions """ SELECT * FROM ${tb2} WHERE NOT 
(dt='2025-01-25' AND region='bj') ORDER BY id """
+
+    sql """ DROP TABLE IF EXISTS ${tb2_src} """
+    sql """ DROP TABLE IF EXISTS ${tb2} """
+
+    // Test Case 7: Static partition with LONG type
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            timestamp_val BIGINT
+        ) ENGINE=iceberg
+        PARTITION BY LIST (timestamp_val) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 1706140800000),
+        (2, 'Bob', 1706227200000),
+        (3, 'Charlie', 1706313600000)
+    """
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (timestamp_val=1706140800000)
+        SELECT 10, 'Eve'
+    """
+    order_qt_q07 """ SELECT * FROM ${tb1} ORDER BY id """
+
+    // Test Case 8: Hybrid mode with LONG type (static) + STRING (dynamic)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            timestamp_val BIGINT,
+            region STRING
+        ) ENGINE=iceberg
+        PARTITION BY LIST (timestamp_val, region) ()
+    """
+    String tb_long_src = db1 + "_long_src"
+    sql """ DROP TABLE IF EXISTS ${tb_long_src} """
+    sql """
+        CREATE TABLE ${tb_long_src} (
+            id BIGINT,
+            name STRING,
+            region STRING
+        ) ENGINE=iceberg
+    """
+    sql """
+        INSERT INTO ${tb_long_src} VALUES
+        (10, 'Eve', 'bj'),
+        (11, 'Frank', 'sh')
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 1706140800000, 'bj'),
+        (2, 'Bob', 1706140800000, 'sh'),
+        (3, 'Charlie', 1706227200000, 'bj')
+    """
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (timestamp_val=1706140800000)
+        SELECT id, name, region FROM ${tb_long_src}
+    """
+    order_qt_q08 """ SELECT * FROM ${tb1} ORDER BY id """
+    sql """ DROP TABLE IF EXISTS ${tb_long_src} """
+
+    // Test Case 9: Static partition with FLOAT type
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            score FLOAT
+        ) ENGINE=iceberg
+        PARTITION BY LIST (score) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 85.5),
+        (2, 'Bob', 90.0),
+        (3, 'Charlie', 75.5)
+    """
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (score=85.5)
+        SELECT 10, 'Eve'
+    """
+    order_qt_q09 """ SELECT * FROM ${tb1} ORDER BY id """
+
+    // Test Case 10: Hybrid mode with FLOAT type (static) + INTEGER (dynamic)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            score FLOAT,
+            level INT
+        ) ENGINE=iceberg
+        PARTITION BY LIST (score, level) ()
+    """
+    String tb_float_src = db1 + "_float_src"
+    sql """ DROP TABLE IF EXISTS ${tb_float_src} """
+    sql """
+        CREATE TABLE ${tb_float_src} (
+            id BIGINT,
+            name STRING,
+            level INT
+        ) ENGINE=iceberg
+    """
+    sql """
+        INSERT INTO ${tb_float_src} VALUES
+        (10, 'Eve', 1),
+        (11, 'Frank', 2)
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 85.5, 1),
+        (2, 'Bob', 85.5, 2),
+        (3, 'Charlie', 90.0, 1)
+    """
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (score=85.5)
+        SELECT id, name, level FROM ${tb_float_src}
+    """
+    order_qt_q10 """ SELECT * FROM ${tb1} ORDER BY id """
+    sql """ DROP TABLE IF EXISTS ${tb_float_src} """
+
+    // Test Case 11: Static partition with DOUBLE type
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            price DOUBLE
+        ) ENGINE=iceberg
+        PARTITION BY LIST (price) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 99.99),
+        (2, 'Bob', 199.99),
+        (3, 'Charlie', 299.99)
+    """
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (price=99.99)
+        SELECT 10, 'Eve'
+    """
+    order_qt_q11 """ SELECT * FROM ${tb1} ORDER BY id """
+
+    // Test Case 12: Hybrid mode with DOUBLE type (static) + STRING (dynamic)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            price DOUBLE,
+            category STRING
+        ) ENGINE=iceberg
+        PARTITION BY LIST (price, category) ()
+    """
+    String tb_double_src = db1 + "_double_src"
+    sql """ DROP TABLE IF EXISTS ${tb_double_src} """
+    sql """
+        CREATE TABLE ${tb_double_src} (
+            id BIGINT,
+            name STRING,
+            category STRING
+        ) ENGINE=iceberg
+    """
+    sql """
+        INSERT INTO ${tb_double_src} VALUES
+        (10, 'Eve', 'A'),
+        (11, 'Frank', 'B')
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 99.99, 'A'),
+        (2, 'Bob', 99.99, 'B'),
+        (3, 'Charlie', 199.99, 'A')
+    """
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (price=99.99)
+        SELECT id, name, category FROM ${tb_double_src}
+    """
+    order_qt_q12 """ SELECT * FROM ${tb1} ORDER BY id """
+    sql """ DROP TABLE IF EXISTS ${tb_double_src} """
+
+    // Test Case 13: Static partition with BOOLEAN type
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            is_active BOOLEAN
+        ) ENGINE=iceberg
+        PARTITION BY LIST (is_active) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', true),
+        (2, 'Bob', false),
+        (3, 'Charlie', true)
+    """
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (is_active=true)
+        SELECT 10, 'Eve'
+    """
+    order_qt_q13 """ SELECT * FROM ${tb1} ORDER BY id """
+
+    // Test Case 14: Hybrid mode with BOOLEAN type (static) + INTEGER (dynamic)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            is_active BOOLEAN,
+            status INT
+        ) ENGINE=iceberg
+        PARTITION BY LIST (is_active, status) ()
+    """
+    String tb_bool_src = db1 + "_bool_src"
+    sql """ DROP TABLE IF EXISTS ${tb_bool_src} """
+    sql """
+        CREATE TABLE ${tb_bool_src} (
+            id BIGINT,
+            name STRING,
+            status INT
+        ) ENGINE=iceberg
+    """
+    sql """
+        INSERT INTO ${tb_bool_src} VALUES
+        (10, 'Eve', 1),
+        (11, 'Frank', 2)
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', true, 1),
+        (2, 'Bob', true, 2),
+        (3, 'Charlie', false, 1)
+    """
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (is_active=true)
+        SELECT id, name, status FROM ${tb_bool_src}
+    """
+    order_qt_q14 """ SELECT * FROM ${tb1} ORDER BY id """
+    sql """ DROP TABLE IF EXISTS ${tb_bool_src} """
+
+    // Test Case 15: Static partition with DATETIME type
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            ts DATETIME
+        ) ENGINE=iceberg
+        PARTITION BY LIST (ts) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', '2025-01-25 10:00:00'),
+        (2, 'Bob', '2025-01-25 11:00:00'),
+        (3, 'Charlie', '2025-01-26 10:00:00')
+    """
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (ts='2025-01-25 10:00:00')
+        SELECT 10, 'Eve'
+    """
+    order_qt_q15 """ SELECT * FROM ${tb1} ORDER BY id """
+
+    // Test Case 16: Hybrid mode with DATETIME type (static) + STRING (dynamic)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            ts DATETIME,
+            region STRING
+        ) ENGINE=iceberg
+        PARTITION BY LIST (ts, region) ()
+    """
+    String tb_ts_src = db1 + "_ts_src"
+    sql """ DROP TABLE IF EXISTS ${tb_ts_src} """
+    sql """
+        CREATE TABLE ${tb_ts_src} (
+            id BIGINT,
+            name STRING,
+            region STRING
+        ) ENGINE=iceberg
+    """
+    sql """
+        INSERT INTO ${tb_ts_src} VALUES
+        (10, 'Eve', 'bj'),
+        (11, 'Frank', 'sh')
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', '2025-01-25 10:00:00', 'bj'),
+        (2, 'Bob', '2025-01-25 10:00:00', 'sh'),
+        (3, 'Charlie', '2025-01-26 10:00:00', 'bj')
+    """
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (ts='2025-01-25 10:00:00')
+        SELECT id, name, region FROM ${tb_ts_src}
+    """
+    order_qt_q16 """ SELECT * FROM ${tb1} ORDER BY id """
+    sql """ DROP TABLE IF EXISTS ${tb_ts_src} """
+
+    // Test Case 17: Static partition with DECIMAL type
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            amount DECIMAL(10,2)
+        ) ENGINE=iceberg
+        PARTITION BY LIST (amount) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 100.50),
+        (2, 'Bob', 200.75),
+        (3, 'Charlie', 300.25)
+    """
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (amount=100.50)
+        SELECT 10, 'Eve'
+    """
+    order_qt_q17 """ SELECT * FROM ${tb1} ORDER BY id """
+
+    // Test Case 18: Hybrid mode with DECIMAL type (static) + INTEGER (dynamic)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            amount DECIMAL(10,2),
+            quantity INT
+        ) ENGINE=iceberg
+        PARTITION BY LIST (amount, quantity) ()
+    """
+    String tb_decimal_src = db1 + "_decimal_src"
+    sql """ DROP TABLE IF EXISTS ${tb_decimal_src} """
+    sql """
+        CREATE TABLE ${tb_decimal_src} (
+            id BIGINT,
+            name STRING,
+            quantity INT
+        ) ENGINE=iceberg
+    """
+    sql """
+        INSERT INTO ${tb_decimal_src} VALUES
+        (10, 'Eve', 10),
+        (11, 'Frank', 20)
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 100.50, 10),
+        (2, 'Bob', 100.50, 20),
+        (3, 'Charlie', 200.75, 10)
+    """
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (amount=100.50)
+        SELECT id, name, quantity FROM ${tb_decimal_src}
+    """
+    order_qt_q18 """ SELECT * FROM ${tb1} ORDER BY id """
+    sql """ DROP TABLE IF EXISTS ${tb_decimal_src} """
+
+    // Test Case 19: Multiple types in static partition (INTEGER, FLOAT, 
BOOLEAN, STRING)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            level INT,
+            score FLOAT,
+            is_active BOOLEAN,
+            category STRING
+        ) ENGINE=iceberg
+        PARTITION BY LIST (level, score, is_active, category) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 1, 85.5, true, 'A'),
+        (2, 'Bob', 1, 85.5, true, 'B'),
+        (3, 'Charlie', 2, 90.0, false, 'A'),
+        (4, 'David', 1, 85.5, false, 'A')
+    """
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (level=1, score=85.5, is_active=true, category='A')
+        SELECT 10, 'Eve'
+    """
+    order_qt_q19 """ SELECT * FROM ${tb1} ORDER BY id """
+
+    // Test Case 20: Hybrid mode with multiple types (2 static + 2 dynamic)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            level INT,
+            score FLOAT,
+            is_active BOOLEAN,
+            category STRING
+        ) ENGINE=iceberg
+        PARTITION BY LIST (level, score, is_active, category) ()
+    """
+    String tb_multi_src = db1 + "_multi_src"
+    sql """ DROP TABLE IF EXISTS ${tb_multi_src} """
+    sql """
+        CREATE TABLE ${tb_multi_src} (
+            id BIGINT,
+            name STRING,
+            is_active BOOLEAN,
+            category STRING
+        ) ENGINE=iceberg
+    """
+    sql """
+        INSERT INTO ${tb_multi_src} VALUES
+        (10, 'Eve', true, 'A'),
+        (11, 'Frank', false, 'B')
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 1, 85.5, true, 'A'),
+        (2, 'Bob', 1, 85.5, true, 'B'),
+        (3, 'Charlie', 1, 85.5, false, 'A'),
+        (4, 'David', 2, 90.0, true, 'A')
+    """
+    sql """
+        INSERT OVERWRITE TABLE ${tb1} 
+        PARTITION (level=1, score=85.5)
+        SELECT id, name, is_active, category FROM ${tb_multi_src}
+    """
+    order_qt_q20 """ SELECT * FROM ${tb1} ORDER BY id """
+    sql """ DROP TABLE IF EXISTS ${tb_multi_src} """
+
+    // 
============================================================================
+    // Test Cases for non-identity partition transforms - static partition 
overwrite should fail
+    // 
============================================================================
+
+    // Test Case 21: Error scenario - bucket partition (non-identity transform)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            category STRING
+        ) ENGINE=iceberg
+        PARTITION BY LIST (bucket(4, category)) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 'food'),
+        (2, 'Bob', 'drink')
+    """
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (category='food')
+            SELECT 10, 'Eve'
+        """
+        exception "Unknown partition column"
+    }
+    // Using correct partition field name should trigger non-identity error
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (category_bucket=0)
+            SELECT 10, 'Eve'
+        """
+        exception "Cannot use static partition syntax for non-identity 
partition field"
+    }
+
+    // Test Case 22: Error scenario - truncate partition (non-identity 
transform)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            description STRING
+        ) ENGINE=iceberg
+        PARTITION BY LIST (truncate(3, description)) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 'hello world'),
+        (2, 'Bob', 'goodbye')
+    """
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (description='hello')
+            SELECT 10, 'Eve'
+        """
+        exception "Unknown partition column"
+    }
+    // Using correct partition field name should trigger non-identity error
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (description_trunc='hel')
+            SELECT 10, 'Eve'
+        """
+        exception "Cannot use static partition syntax for non-identity 
partition field"
+    }
+
+    // Test Case 23: Error scenario - day partition (non-identity time 
transform)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            ts DATETIME
+        ) ENGINE=iceberg
+        PARTITION BY LIST (day(ts)) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', '2025-01-25 10:00:00'),
+        (2, 'Bob', '2025-01-26 11:00:00')
+    """
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (ts='2025-01-25 10:00:00')
+            SELECT 10, 'Eve'
+        """
+        exception "Unknown partition column"
+    }
+    // Using correct partition field name should trigger non-identity error
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (ts_day='2025-01-25')
+            SELECT 10, 'Eve'
+        """
+        exception "Cannot use static partition syntax for non-identity 
partition field"
+    }
+
+    // Test Case 24: Error scenario - year partition (non-identity time 
transform)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            event_date DATE
+        ) ENGINE=iceberg
+        PARTITION BY LIST (year(event_date)) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', '2024-06-15'),
+        (2, 'Bob', '2025-01-25')
+    """
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (event_date='2024-06-15')
+            SELECT 10, 'Eve'
+        """
+        exception "Unknown partition column"
+    }
+    // Using correct partition field name should trigger non-identity error
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (event_date_year=2024)
+            SELECT 10, 'Eve'
+        """
+        exception "Cannot use static partition syntax for non-identity 
partition field"
+    }
+
+    // Test Case 25: Error scenario - month partition (non-identity time 
transform)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            event_date DATE
+        ) ENGINE=iceberg
+        PARTITION BY LIST (month(event_date)) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', '2025-01-15'),
+        (2, 'Bob', '2025-02-20')
+    """
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (event_date='2025-01-15')
+            SELECT 10, 'Eve'
+        """
+        exception "Unknown partition column"
+    }
+    // Using correct partition field name should trigger non-identity error
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (event_date_month='2025-01')
+            SELECT 10, 'Eve'
+        """
+        exception "Cannot use static partition syntax for non-identity 
partition field"
+    }
+
+    // Test Case 26: Error scenario - hour partition (non-identity time 
transform)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            ts DATETIME
+        ) ENGINE=iceberg
+        PARTITION BY LIST (hour(ts)) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', '2025-01-25 10:30:00'),
+        (2, 'Bob', '2025-01-25 11:45:00')
+    """
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (ts='2025-01-25 10:00:00')
+            SELECT 10, 'Eve'
+        """
+        exception "Unknown partition column"
+    }
+    // Using correct partition field name should trigger non-identity error
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (ts_hour='2025-01-25-10')
+            SELECT 10, 'Eve'
+        """
+        exception "Cannot use static partition syntax for non-identity 
partition field"
+    }
+
+    // Test Case 27: Error scenario - mixed identity and non-identity 
partitions (bucket)
+    // Table has identity partition (region) + non-identity partition (bucket 
on id)
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            region STRING
+        ) ENGINE=iceberg
+        PARTITION BY LIST (region, bucket(4, id)) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 'bj'),
+        (2, 'Bob', 'sh')
+    """
+    // Static partition on non-identity column should fail
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (id=1)
+            SELECT 'Eve', 'bj'
+        """
+        exception "Unknown partition column"
+    }
+    // Using correct partition field name should trigger non-identity error
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (id_bucket=0)
+            SELECT 'Eve', 'bj'
+        """
+        exception "Cannot use static partition syntax for non-identity 
partition field"
+    }
+
+    // Test Case 28: Error scenario - truncate on integer column
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            amount INT
+        ) ENGINE=iceberg
+        PARTITION BY LIST (truncate(100, amount)) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 150),
+        (2, 'Bob', 250)
+    """
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (amount=100)
+            SELECT 10, 'Eve'
+        """
+        exception "Unknown partition column"
+    }
+    // Using correct partition field name should trigger non-identity error
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (amount_trunc=100)
+            SELECT 10, 'Eve'
+        """
+        exception "Cannot use static partition syntax for non-identity 
partition field"
+    }
+
+    // Test Case 29: Error scenario - bucket on BIGINT column
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            user_id BIGINT
+        ) ENGINE=iceberg
+        PARTITION BY LIST (bucket(8, user_id)) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 1001),
+        (2, 'Bob', 2002)
+    """
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (user_id=1001)
+            SELECT 10, 'Eve'
+        """
+        exception "Unknown partition column"
+    }
+    // Using correct partition field name should trigger non-identity error
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (user_id_bucket=0)
+            SELECT 10, 'Eve'
+        """
+        exception "Cannot use static partition syntax for non-identity 
partition field"
+    }
+
+    // Test Case 30: Mixed partitions - identity column is OK, but 
non-identity should fail
+    // Test that specifying only identity partition columns works,
+    // but including non-identity columns fails
+    sql """ DROP TABLE IF EXISTS ${tb1} """
+    sql """
+        CREATE TABLE ${tb1} (
+            id BIGINT,
+            name STRING,
+            region STRING,
+            ts DATETIME
+        ) ENGINE=iceberg
+        PARTITION BY LIST (region, day(ts)) ()
+    """
+    sql """
+        INSERT INTO ${tb1} VALUES
+        (1, 'Alice', 'bj', '2025-01-25 10:00:00'),
+        (2, 'Bob', 'sh', '2025-01-26 11:00:00')
+    """
+    // Specifying only identity partition column (region) - this should work 
normally
+    // But we need to also select ts column dynamically since day(ts) is a 
partition
+    // Note: This is a tricky case - with partial static partition, the 
non-specified
+    // partition columns should come from SELECT. But ts column must be in the 
query result.
+    // For simplicity, test only the error case where non-identity column is 
specified
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (ts='2025-01-25 10:00:00')
+            SELECT 10, 'Eve', 'bj'
+        """
+        exception "Unknown partition column"
+    }
+    // Using correct partition field name should trigger non-identity error
+    test {
+        sql """
+            INSERT OVERWRITE TABLE ${tb1} 
+            PARTITION (ts_day='2025-01-25')
+            SELECT 10, 'Eve', 'bj'
+        """
+        exception "Cannot use static partition syntax for non-identity 
partition field"
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to