This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8a974d4808a [feature](iceberg) Support static partition overwrite for
Iceberg tables (#58396)
8a974d4808a is described below
commit 8a974d4808af5173f6c43e2dc68a1c970f392535
Author: Socrates <[email protected]>
AuthorDate: Fri Dec 5 14:09:47 2025 +0800
[feature](iceberg) Support static partition overwrite for Iceberg tables
(#58396)
### What problem does this PR solve?
### Proposed changes
This PR implements static partition overwrite functionality for Iceberg
external tables, allowing users to precisely overwrite specific
partitions using the `INSERT OVERWRITE ... PARTITION (col='value', ...)`
syntax.
### Background
Before this PR, Doris supports:
- ✅ `INSERT INTO` with dynamic partition for Iceberg tables
- ✅ `INSERT OVERWRITE` for full table replacement
- ❌ `INSERT OVERWRITE ... PARTITION (...)` for static partition
overwrite
### New Features
1. **Full Static Partition Mode**: Overwrite a specific partition when
all partition columns are specified
```sql
INSERT OVERWRITE TABLE iceberg_db.tbl PARTITION (dt='2025-01-25',
region='bj')
SELECT id, name FROM source_table;
```
2. **Hybrid Partition Mode**: Partial static + partial dynamic partition
```sql
-- dt is static, region comes from SELECT dynamically
INSERT OVERWRITE TABLE iceberg_db.tbl PARTITION (dt='2025-01-25')
SELECT id, name, region FROM source_table;
```
### Implementation Details
#### FE Changes
- **Parser** (`DorisParser.g4`, `LogicalPlanBuilder.java`): Extended
partition spec parsing to support `PARTITION (col='value', ...)` syntax
- **InsertPartitionSpec**: New unified data structure to represent
partition modes (auto-detect, dynamic, static)
- **UnboundIcebergTableSink**: Added `staticPartitionKeyValues` field to
carry static partition info
- **BindSink**: Added validation for static partition columns and
generate constant expressions for static partition values
- **IcebergTransaction**: Implemented `commitStaticPartitionOverwrite()`
using Iceberg's `OverwriteFiles.overwriteByRowFilter()` API
- **IcebergUtils**: Added `parsePartitionValueFromString()` utility for
partition value type conversion
#### BE Changes
- **VIcebergTableWriter**:
- Support full static partition mode (all data goes to single partition)
- Support hybrid partition mode (static columns from config, dynamic
columns from data)
- Added `_is_full_static_partition` and
`_dynamic_partition_column_indices` for mode detection
#### Thrift Changes
- Added `static_partition_values` field to `TIcebergTableSink` for
passing static partition info from FE to BE
---
.../sink/writer/iceberg/viceberg_table_writer.cpp | 227 +++-
.../sink/writer/iceberg/viceberg_table_writer.h | 21 +
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 6 +-
.../datasource/iceberg/IcebergTransaction.java | 106 +-
.../nereids/analyzer/UnboundIcebergTableSink.java | 42 +-
.../nereids/analyzer/UnboundTableSinkCreator.java | 24 +-
.../doris/nereids/parser/InsertPartitionSpec.java | 109 ++
.../doris/nereids/parser/LogicalPlanBuilder.java | 76 +-
.../doris/nereids/rules/analysis/BindSink.java | 106 +-
.../insert/IcebergInsertCommandContext.java | 23 +
.../insert/InsertOverwriteTableCommand.java | 35 +-
.../org/apache/doris/planner/IcebergTableSink.java | 8 +
.../nereids/parser/InsertPartitionSpecTest.java | 217 ++++
.../parser/ParseInsertPartitionSpecTest.java | 271 +++++
gensrc/thrift/DataSinks.thrift | 4 +
.../test_iceberg_static_partition_overwrite.out | 135 +++
.../test_iceberg_static_partition_overwrite.groovy | 1086 ++++++++++++++++++++
17 files changed, 2432 insertions(+), 64 deletions(-)
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 451313053ea..24b29df5cdb 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -18,9 +18,11 @@
#include "viceberg_table_writer.h"
#include "runtime/runtime_state.h"
+#include "vec/columns/column_const.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/materialize_block.h"
+#include "vec/data_types/serde/data_type_serde.h"
#include "vec/exec/format/table/iceberg/partition_spec_parser.h"
#include "vec/exec/format/table/iceberg/schema_parser.h"
#include "vec/exprs/vexpr.h"
@@ -78,6 +80,9 @@ Status VIcebergTableWriter::open(RuntimeState* state,
RuntimeProfile* profile) {
_partition_spec = iceberg::PartitionSpecParser::from_json(_schema,
partition_spec_json);
_iceberg_partition_columns = _to_iceberg_partition_columns();
}
+
+ // Initialize static partition values if present
+ _init_static_partition_values();
} catch (doris::Exception& e) {
return e.to_status();
}
@@ -116,6 +121,68 @@ VIcebergTableWriter::_to_iceberg_partition_columns() {
return partition_columns;
}
+void VIcebergTableWriter::_init_static_partition_values() {
+ auto& iceberg_sink = _t_sink.iceberg_table_sink;
+ if (!iceberg_sink.__isset.static_partition_values ||
+ iceberg_sink.static_partition_values.empty()) {
+ _has_static_partition = false;
+ _is_full_static_partition = false;
+ return;
+ }
+
+ _has_static_partition = true;
+ const auto& static_values_map = iceberg_sink.static_partition_values;
+
+ size_t num_cols = _iceberg_partition_columns.size();
+ _partition_column_static_values.resize(num_cols);
+ _partition_column_is_static.assign(num_cols, 0);
+
+ size_t dynamic_count = 0;
+ for (size_t i = 0; i < num_cols; ++i) {
+ const std::string& col_name =
_iceberg_partition_columns[i].field().name();
+ auto it = static_values_map.find(col_name);
+ if (it != static_values_map.end()) {
+ _partition_column_static_values[i] = it->second;
+ _partition_column_is_static[i] = 1;
+ } else {
+ dynamic_count++;
+ }
+ }
+
+ // Check if all partition columns are statically specified
+ _is_full_static_partition = (dynamic_count == 0);
+
+ // Build static partition path prefix
+ _static_partition_path = _build_static_partition_path();
+
+ // For full static mode, build complete partition value list
+ if (_is_full_static_partition) {
+ _static_partition_value_list = _partition_column_static_values;
+ }
+}
+
+/**
+ * Builds the partition path string for static partition columns.
+ * Only static partition columns are included, dynamic ones are skipped.
+ * Format: "column1=value1/column2=value2/..."
+ * Example: "year=2023/month=12"
+ */
+std::string VIcebergTableWriter::_build_static_partition_path() {
+ std::stringstream ss;
+ bool first = true;
+ for (size_t i = 0; i < _iceberg_partition_columns.size(); ++i) {
+ if (_partition_column_is_static[i]) {
+ if (!first) {
+ ss << "/";
+ }
+ first = false;
+ ss << _escape(_iceberg_partition_columns[i].field().name()) << "="
+ << _escape(_partition_column_static_values[i]);
+ }
+ }
+ return ss.str();
+}
+
Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block&
block) {
SCOPED_RAW_TIMER(&_send_data_ns);
if (block.rows() == 0) {
@@ -129,6 +196,49 @@ Status VIcebergTableWriter::write(RuntimeState* state,
vectorized::Block& block)
std::unordered_map<std::shared_ptr<VIcebergPartitionWriter>,
IColumn::Filter> writer_positions;
_row_count += output_block.rows();
+ // Case 1: Full static partition - all data goes to a single partition
+ if (_is_full_static_partition) {
+ std::shared_ptr<VIcebergPartitionWriter> writer;
+ {
+ SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
+ auto writer_iter =
_partitions_to_writers.find(_static_partition_path);
+ if (writer_iter == _partitions_to_writers.end()) {
+ try {
+ writer = _create_partition_writer(nullptr, -1);
+ } catch (doris::Exception& e) {
+ return e.to_status();
+ }
+ _partitions_to_writers.insert({_static_partition_path,
writer});
+ RETURN_IF_ERROR(writer->open(_state, _operator_profile));
+ } else {
+ if (writer_iter->second->written_len() >
_target_file_size_bytes) {
+ std::string file_name(writer_iter->second->file_name());
+ int file_name_index =
writer_iter->second->file_name_index();
+ {
+ SCOPED_RAW_TIMER(&_close_ns);
+
static_cast<void>(writer_iter->second->close(Status::OK()));
+ }
+ _partitions_to_writers.erase(writer_iter);
+ try {
+ writer = _create_partition_writer(nullptr, -1,
&file_name,
+ file_name_index + 1);
+ } catch (doris::Exception& e) {
+ return e.to_status();
+ }
+ _partitions_to_writers.insert({_static_partition_path,
writer});
+ RETURN_IF_ERROR(writer->open(_state, _operator_profile));
+ } else {
+ writer = writer_iter->second;
+ }
+ }
+ }
+ SCOPED_RAW_TIMER(&_partition_writers_write_ns);
+ output_block.erase(_non_write_columns_indices);
+ RETURN_IF_ERROR(writer->write(output_block));
+ return Status::OK();
+ }
+
+ // Case 2: Non-partitioned table
if (_iceberg_partition_columns.empty()) {
std::shared_ptr<VIcebergPartitionWriter> writer;
{
@@ -174,9 +284,24 @@ Status VIcebergTableWriter::write(RuntimeState* state,
vectorized::Block& block)
Block transformed_block;
SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
transformed_block.reserve(_iceberg_partition_columns.size());
- for (auto& iceberg_partition_columns : _iceberg_partition_columns) {
-
transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply(
- output_block, iceberg_partition_columns.source_idx()));
+ for (int i = 0; i < _iceberg_partition_columns.size(); ++i) {
+ auto& iceberg_partition_columns = _iceberg_partition_columns[i];
+ if (_has_static_partition && _partition_column_is_static[i]) {
+ auto result_type =
+
iceberg_partition_columns.partition_column_transform().get_result_type();
+ auto data_col = result_type->create_column();
+ StringRef str_ref(_partition_column_static_values[i].data(),
+ _partition_column_static_values[i].size());
+ DataTypeSerDe::FormatOptions options;
+ RETURN_IF_ERROR(result_type->get_serde()->from_string(str_ref,
*data_col, options));
+ auto col = ColumnConst::create(std::move(data_col),
output_block.rows());
+ transformed_block.insert(
+ {std::move(col), result_type,
iceberg_partition_columns.field().name()});
+ } else {
+ transformed_block.insert(
+
iceberg_partition_columns.partition_column_transform().apply(
+ output_block,
iceberg_partition_columns.source_idx()));
+ }
}
for (int i = 0; i < output_block.rows(); ++i) {
std::optional<PartitionData> partition_data;
@@ -263,8 +388,7 @@ Status
VIcebergTableWriter::_filter_block(doris::vectorized::Block& block,
const ColumnsWithTypeAndName& columns_with_type_and_name =
block.get_columns_with_type_and_name();
vectorized::ColumnsWithTypeAndName result_columns;
- for (int i = 0; i < columns_with_type_and_name.size(); ++i) {
- const auto& col = columns_with_type_and_name[i];
+ for (const auto& col : columns_with_type_and_name) {
result_columns.emplace_back(col.column->clone_resized(col.column->size()),
col.type,
col.name);
}
@@ -317,14 +441,24 @@ std::string VIcebergTableWriter::_partition_to_path(const
doris::iceberg::Struct
std::stringstream ss;
for (size_t i = 0; i < _iceberg_partition_columns.size(); i++) {
auto& iceberg_partition_column = _iceberg_partition_columns[i];
- std::string value_string =
-
iceberg_partition_column.partition_column_transform().to_human_string(
-
iceberg_partition_column.partition_column_transform().get_result_type(),
- data.get(i));
+ const std::string& col_name = iceberg_partition_column.field().name();
+
+ std::string value_string;
+ // In hybrid mode, check if this column is statically specified
+ if (_has_static_partition && _partition_column_is_static[i]) {
+ // Use static partition value
+ value_string = _partition_column_static_values[i];
+ } else {
+ // Compute from data (dynamic partition)
+ value_string =
iceberg_partition_column.partition_column_transform().to_human_string(
+
iceberg_partition_column.partition_column_transform().get_result_type(),
+ data.get(i));
+ }
+
if (i > 0) {
ss << "/";
}
- ss << _escape(iceberg_partition_column.field().name()) << '=' <<
_escape(value_string);
+ ss << _escape(col_name) << '=' << _escape(value_string);
}
return ss.str();
@@ -340,10 +474,18 @@ std::vector<std::string>
VIcebergTableWriter::_partition_values(
partition_values.reserve(_iceberg_partition_columns.size());
for (size_t i = 0; i < _iceberg_partition_columns.size(); i++) {
auto& iceberg_partition_column = _iceberg_partition_columns[i];
- partition_values.emplace_back(
-
iceberg_partition_column.partition_column_transform().get_partition_value(
-
iceberg_partition_column.partition_column_transform().get_result_type(),
- data.get(i)));
+
+ // In hybrid mode, check if this column is statically specified
+ if (_has_static_partition && _partition_column_is_static[i]) {
+ // Use static partition value
+ partition_values.emplace_back(_partition_column_static_values[i]);
+ } else {
+ // Compute from data (dynamic partition)
+ partition_values.emplace_back(
+
iceberg_partition_column.partition_column_transform().get_partition_value(
+
iceberg_partition_column.partition_column_transform().get_result_type(),
+ data.get(i)));
+ }
}
return partition_values;
@@ -359,7 +501,22 @@ std::shared_ptr<VIcebergPartitionWriter>
VIcebergTableWriter::_create_partition_
std::string original_write_path;
std::string target_path;
- if (transformed_block != nullptr) {
+ // Case 1: Full static partition - use pre-computed static partition path
and values
+ if (_is_full_static_partition) {
+ partition_values = _static_partition_value_list;
+ if (!_static_partition_path.empty()) {
+ original_write_path = fmt::format("{}/{}",
iceberg_table_sink.original_output_path,
+ _static_partition_path);
+ target_path = fmt::format("{}/{}", output_path,
_static_partition_path);
+ write_path = fmt::format("{}/{}", output_path,
_static_partition_path);
+ } else {
+ original_write_path = iceberg_table_sink.original_output_path;
+ target_path = output_path;
+ write_path = output_path;
+ }
+ } else if (transformed_block != nullptr) {
+ // Case 2: Dynamic partition or Hybrid mode (partial static + partial
dynamic)
+ // _partition_to_path and _partition_values already handle hybrid mode
internally
PartitionData partition_data = _get_partition_data(transformed_block,
position);
std::string partition_path = _partition_to_path(partition_data);
partition_values = _partition_values(partition_data);
@@ -368,16 +525,18 @@ std::shared_ptr<VIcebergPartitionWriter>
VIcebergTableWriter::_create_partition_
target_path = fmt::format("{}/{}", output_path, partition_path);
write_path = fmt::format("{}/{}", output_path, partition_path);
} else {
+ // Case 3: Non-partitioned table
original_write_path = iceberg_table_sink.original_output_path;
target_path = output_path;
write_path = output_path;
}
- VIcebergPartitionWriter::WriteInfo write_info = {std::move(write_path),
-
std::move(original_write_path),
- std::move(target_path),
-
iceberg_table_sink.file_type,
- {}};
+ VIcebergPartitionWriter::WriteInfo write_info = {
+ .write_path = std::move(write_path),
+ .original_write_path = std::move(original_write_path),
+ .target_path = std::move(target_path),
+ .file_type = iceberg_table_sink.file_type,
+ .broker_addresses = {}};
if (iceberg_table_sink.__isset.broker_addresses) {
write_info.broker_addresses.assign(iceberg_table_sink.broker_addresses.begin(),
iceberg_table_sink.broker_addresses.end());
@@ -387,7 +546,7 @@ std::shared_ptr<VIcebergPartitionWriter>
VIcebergTableWriter::_create_partition_
std::vector<std::string> column_names;
column_names.reserve(_write_output_vexpr_ctxs.size());
for (int i = 0; i < _schema->columns().size(); i++) {
- if (_non_write_columns_indices.find(i) ==
_non_write_columns_indices.end()) {
+ if (!_non_write_columns_indices.contains(i)) {
column_names.emplace_back(_schema->columns()[i].field_name());
}
}
@@ -406,14 +565,18 @@ PartitionData
VIcebergTableWriter::_get_partition_data(vectorized::Block* transf
values.reserve(_iceberg_partition_columns.size());
int column_idx = 0;
for (auto& iceberg_partition_column : _iceberg_partition_columns) {
- const vectorized::ColumnWithTypeAndName& partition_column =
- transformed_block->get_by_position(column_idx);
- auto value =
-
_get_iceberg_partition_value(iceberg_partition_column.partition_column_transform()
- .get_result_type()
- ->get_primitive_type(),
- partition_column, position);
- values.emplace_back(value);
+ if (_has_static_partition && _partition_column_is_static[column_idx]) {
+ values.emplace_back();
+ } else {
+ const vectorized::ColumnWithTypeAndName& partition_column =
+ transformed_block->get_by_position(column_idx);
+ auto value = _get_iceberg_partition_value(
+ iceberg_partition_column.partition_column_transform()
+ .get_result_type()
+ ->get_primitive_type(),
+ partition_column, position);
+ values.emplace_back(value);
+ }
++column_idx;
}
return PartitionData(std::move(values));
@@ -426,11 +589,11 @@ std::any
VIcebergTableWriter::_get_iceberg_partition_value(
ColumnPtr col_ptr =
partition_column.column->convert_to_full_column_if_const();
CHECK(col_ptr);
if (col_ptr->is_nullable()) {
- const ColumnNullable* nullable_column =
+ const auto* nullable_column =
reinterpret_cast<const
vectorized::ColumnNullable*>(col_ptr.get());
- auto* __restrict null_map_data =
nullable_column->get_null_map_data().data();
+ const auto* __restrict null_map_data =
nullable_column->get_null_map_data().data();
if (null_map_data[position]) {
- return std::any();
+ return {};
}
col_ptr = nullable_column->get_nested_column_ptr();
}
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index 4ea3e2135bb..b6e3c4edc57 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -97,6 +97,11 @@ private:
std::string _escape(const std::string& path);
std::vector<std::string> _partition_values(const
doris::iceberg::StructLike& data);
+ // Initialize static partition values from Thrift config
+ void _init_static_partition_values();
+ // Build static partition path from static partition values
+ std::string _build_static_partition_path();
+
std::shared_ptr<VIcebergPartitionWriter> _create_partition_writer(
vectorized::Block* transformed_block, int position,
const std::string* file_name = nullptr, int file_name_index = 0);
@@ -125,6 +130,22 @@ private:
std::set<size_t> _non_write_columns_indices;
std::vector<IcebergPartitionColumn> _iceberg_partition_columns;
+ // Static partition values for each partition column (indexed by column
index)
+ // If _partition_column_is_static[i] is true, this stores the static value.
+ std::vector<std::string> _partition_column_static_values;
+ // Flags to indicate if the partition column at index i is static
+ std::vector<uint8_t> _partition_column_is_static;
+
+ // Whether any static partition columns are specified
+ bool _has_static_partition = false;
+ // Whether ALL partition columns are statically specified (full static
mode)
+ // If false but _has_static_partition is true, it's partial static
(hybrid) mode
+ bool _is_full_static_partition = false;
+ // Pre-computed static partition path prefix (for full static mode, this
is the complete path)
+ std::string _static_partition_path;
+ // Pre-computed static partition value list (for full static mode only)
+ std::vector<std::string> _static_partition_value_list;
+
std::unordered_map<std::string, std::shared_ptr<VIcebergPartitionWriter>>
_partitions_to_writers;
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index d0caecc2bd6..003f9c36b34 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -988,10 +988,14 @@ partitionSpec
: TEMPORARY? (PARTITION | PARTITIONS) partitions=identifierList
| TEMPORARY? PARTITION partition=errorCapturingIdentifier
| (PARTITION | PARTITIONS) LEFT_PAREN ASTERISK RIGHT_PAREN // for auto
detect partition in overwriting
- // TODO: support analyze external table partition spec
https://github.com/apache/doris/pull/24154
+ | PARTITION LEFT_PAREN partitionKeyValue (COMMA partitionKeyValue)*
RIGHT_PAREN // static partition: PARTITION (col1='val1', col2='val2')
// | PARTITIONS WITH RECENT
;
+partitionKeyValue
+ : identifier EQ expression // col='value' or col=123
+ ;
+
partitionTable
: ((autoPartition=AUTO)? PARTITION BY (RANGE | LIST)?
partitionList=identityOrFunctionList
(LEFT_PAREN (partitions=partitionsDef)? RIGHT_PAREN))
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index b1bbcc920cf..502b1eaa7c8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -37,20 +37,27 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.types.Types;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
public class IcebergTransaction implements Transaction {
@@ -258,7 +265,12 @@ public class IcebergTransaction implements Transaction {
if (updateMode == TUpdateMode.APPEND) {
commitAppendTxn(pendingResults);
} else {
- commitReplaceTxn(pendingResults);
+ // Check if this is a static partition overwrite
+ if (insertCtx != null && insertCtx.isStaticPartitionOverwrite()) {
+ commitStaticPartitionOverwrite(pendingResults);
+ } else {
+ commitReplaceTxn(pendingResults);
+ }
}
}
@@ -373,4 +385,96 @@ public class IcebergTransaction implements Transaction {
appendPartitionOp.commit();
}
+ /**
+ * Commit static partition overwrite operation
+ * This method uses OverwriteFiles.overwriteByRowFilter() to overwrite
only the specified partitions
+ */
+ private void commitStaticPartitionOverwrite(List<WriteResult>
pendingResults) {
+ Table icebergTable = transaction.table();
+ PartitionSpec spec = icebergTable.spec();
+ Schema schema = icebergTable.schema();
+
+ // Build partition filter expression from static partition values
+ Expression partitionFilter = buildPartitionFilter(
+ insertCtx.getStaticPartitionValues(), spec, schema);
+
+ // Create OverwriteFiles operation
+ OverwriteFiles overwriteFiles = transaction.newOverwrite();
+ if (branchName != null) {
+ overwriteFiles = overwriteFiles.toBranch(branchName);
+ }
+ overwriteFiles =
overwriteFiles.scanManifestsWith(ops.getThreadPoolWithPreAuth());
+
+ // Set partition filter to overwrite only matching partitions
+ overwriteFiles = overwriteFiles.overwriteByRowFilter(partitionFilter);
+
+ // Add new data files
+ for (WriteResult result : pendingResults) {
+ Preconditions.checkState(result.referencedDataFiles().length == 0,
+ "Should have no referenced data files for static partition
overwrite.");
+ Arrays.stream(result.dataFiles()).forEach(overwriteFiles::addFile);
+ }
+
+ // Commit the overwrite operation
+ overwriteFiles.commit();
+ }
+
+ /**
+ * Build partition filter expression from static partition key-value pairs
+ *
+ * @param staticPartitions Map of partition column name to partition value
(as String)
+ * @param spec PartitionSpec of the table
+ * @param schema Schema of the table
+ * @return Iceberg Expression for partition filtering
+ */
+ private Expression buildPartitionFilter(
+ Map<String, String> staticPartitions,
+ PartitionSpec spec,
+ Schema schema) {
+ if (staticPartitions == null || staticPartitions.isEmpty()) {
+ return Expressions.alwaysTrue();
+ }
+
+ List<Expression> predicates = new ArrayList<>();
+
+ for (PartitionField field : spec.fields()) {
+ String partitionColName = field.name();
+ if (staticPartitions.containsKey(partitionColName)) {
+ String partitionValueStr =
staticPartitions.get(partitionColName);
+
+ // Get source field to determine the type
+ Types.NestedField sourceField =
schema.findField(field.sourceId());
+ if (sourceField == null) {
+ throw new RuntimeException(String.format("Source field not
found for partition field: %s",
+ partitionColName));
+ }
+
+ // Convert partition value string to appropriate type
+ Object partitionValue =
IcebergUtils.parsePartitionValueFromString(
+ partitionValueStr, sourceField.type());
+
+ // Build equality expression using source field name (not
partition field name)
+ // For identity partitions, Iceberg requires the source column
name in expressions
+ String sourceColName = sourceField.name();
+ Expression eqExpr;
+ if (partitionValue == null) {
+ eqExpr = Expressions.isNull(sourceColName);
+ } else {
+ eqExpr = Expressions.equal(sourceColName, partitionValue);
+ }
+ predicates.add(eqExpr);
+ }
+ }
+
+ if (predicates.isEmpty()) {
+ return Expressions.alwaysTrue();
+ }
+
+ // Combine all predicates with AND
+ Expression result = predicates.get(0);
+ for (int i = 1; i < predicates.size(); i++) {
+ result = Expressions.and(result, predicates.get(i));
+ }
+ return result;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundIcebergTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundIcebergTableSink.java
index 2632bda2310..765b56de1b9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundIcebergTableSink.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundIcebergTableSink.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.analyzer;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
@@ -26,19 +27,24 @@ import
org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
/**
* Represent an iceberg table sink plan node that has not been bound.
*/
public class UnboundIcebergTableSink<CHILD_TYPE extends Plan> extends
UnboundBaseExternalTableSink<CHILD_TYPE> {
+ // Static partition key-value pairs for INSERT OVERWRITE ... PARTITION
+ // (col='val', ...)
+ private final Map<String, Expression> staticPartitionKeyValues;
public UnboundIcebergTableSink(List<String> nameParts, List<String>
colNames, List<String> hints,
List<String> partitions, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, DMLCommandType.NONE,
- Optional.empty(), Optional.empty(), child);
+ Optional.empty(), Optional.empty(), child, null);
}
/**
@@ -52,8 +58,35 @@ public class UnboundIcebergTableSink<CHILD_TYPE extends
Plan> extends UnboundBas
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties>
logicalProperties,
CHILD_TYPE child) {
+ this(nameParts, colNames, hints, partitions, dmlCommandType,
+ groupExpression, logicalProperties, child, null);
+ }
+
+ /**
+ * constructor with static partition
+ */
+ public UnboundIcebergTableSink(List<String> nameParts,
+ List<String> colNames,
+ List<String> hints,
+ List<String> partitions,
+ DMLCommandType dmlCommandType,
+ Optional<GroupExpression> groupExpression,
+ Optional<LogicalProperties> logicalProperties,
+ CHILD_TYPE child,
+ Map<String, Expression> staticPartitionKeyValues) {
super(nameParts, PlanType.LOGICAL_UNBOUND_ICEBERG_TABLE_SINK,
ImmutableList.of(), groupExpression,
logicalProperties, colNames, dmlCommandType, child, hints,
partitions);
+ this.staticPartitionKeyValues = staticPartitionKeyValues != null
+ ? ImmutableMap.copyOf(staticPartitionKeyValues)
+ : null;
+ }
+
+ public Map<String, Expression> getStaticPartitionKeyValues() {
+ return staticPartitionKeyValues;
+ }
+
+ public boolean hasStaticPartition() {
+ return staticPartitionKeyValues != null &&
!staticPartitionKeyValues.isEmpty();
}
@Override
@@ -61,7 +94,7 @@ public class UnboundIcebergTableSink<CHILD_TYPE extends Plan>
extends UnboundBas
Preconditions.checkArgument(children.size() == 1,
"UnboundIcebergTableSink only accepts one child");
return new UnboundIcebergTableSink<>(nameParts, colNames, hints,
partitions,
- dmlCommandType, groupExpression, Optional.empty(),
children.get(0));
+ dmlCommandType, groupExpression, Optional.empty(),
children.get(0), staticPartitionKeyValues);
}
@Override
@@ -72,13 +105,14 @@ public class UnboundIcebergTableSink<CHILD_TYPE extends
Plan> extends UnboundBas
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression)
{
return new UnboundIcebergTableSink<>(nameParts, colNames, hints,
partitions,
- dmlCommandType, groupExpression,
Optional.of(getLogicalProperties()), child());
+ dmlCommandType, groupExpression,
Optional.of(getLogicalProperties()), child(),
+ staticPartitionKeyValues);
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression>
groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan>
children) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints,
partitions,
- dmlCommandType, groupExpression, logicalProperties,
children.get(0));
+ dmlCommandType, groupExpression, logicalProperties,
children.get(0), staticPartitionKeyValues);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
index a5dfa917fe3..e579c12d82a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
@@ -27,6 +27,7 @@ import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.dictionary.Dictionary;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.exceptions.ParseException;
+import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
@@ -38,6 +39,7 @@ import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
import com.google.common.collect.ImmutableList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
/**
@@ -70,6 +72,18 @@ public class UnboundTableSinkCreator {
List<String> colNames, List<String> hints, boolean
temporaryPartition, List<String> partitions,
boolean isPartialUpdate, TPartialUpdateNewRowPolicy
partialUpdateNewKeyPolicy,
DMLCommandType dmlCommandType, LogicalPlan plan) {
+ return createUnboundTableSink(nameParts, colNames, hints,
temporaryPartition, partitions,
+ isPartialUpdate, partialUpdateNewKeyPolicy, dmlCommandType,
plan, null);
+ }
+
+ /**
+ * create unbound sink for DML plan with static partition support for
Iceberg.
+ */
+ public static LogicalSink<? extends Plan>
createUnboundTableSink(List<String> nameParts,
+ List<String> colNames, List<String> hints, boolean
temporaryPartition, List<String> partitions,
+ boolean isPartialUpdate, TPartialUpdateNewRowPolicy
partialUpdateNewKeyPolicy,
+ DMLCommandType dmlCommandType, LogicalPlan plan,
+ Map<String, Expression> staticPartitionKeyValues) {
String catalogName =
RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0);
CatalogIf<?> curCatalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
if (curCatalog instanceof InternalCatalog) {
@@ -81,7 +95,7 @@ public class UnboundTableSinkCreator {
dmlCommandType, Optional.empty(), Optional.empty(), plan);
} else if (curCatalog instanceof IcebergExternalCatalog) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints,
partitions,
- dmlCommandType, Optional.empty(), Optional.empty(), plan);
+ dmlCommandType, Optional.empty(), Optional.empty(), plan,
staticPartitionKeyValues);
} else if (curCatalog instanceof JdbcExternalCatalog) {
return new UnboundJdbcTableSink<>(nameParts, colNames, hints,
partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
@@ -90,13 +104,15 @@ public class UnboundTableSinkCreator {
}
/**
- * create unbound sink for DML plan with auto detect overwrite partition
enable.
+ * create unbound sink for DML plan with auto detect overwrite partition
enable
+ * and static partition support for Iceberg.
+ * TODO: staticPartitionKeyValues is only used for Iceberg, support other
catalog types in future.
*/
public static LogicalSink<? extends Plan>
createUnboundTableSinkMaybeOverwrite(List<String> nameParts,
List<String> colNames, List<String> hints, boolean
temporaryPartition, List<String> partitions,
boolean isAutoDetectPartition, boolean isOverwrite, boolean
isPartialUpdate,
TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy,
DMLCommandType dmlCommandType,
- LogicalPlan plan) {
+ LogicalPlan plan, Map<String, Expression>
staticPartitionKeyValues) {
if (isAutoDetectPartition) { // partitions is null
if (!isOverwrite) {
throw new ParseException("ASTERISK is only supported in
overwrite partition for OLAP table");
@@ -117,7 +133,7 @@ public class UnboundTableSinkCreator {
dmlCommandType, Optional.empty(), Optional.empty(), plan);
} else if (curCatalog instanceof IcebergExternalCatalog &&
!isAutoDetectPartition) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints,
partitions,
- dmlCommandType, Optional.empty(), Optional.empty(), plan);
+ dmlCommandType, Optional.empty(), Optional.empty(), plan,
staticPartitionKeyValues);
} else if (curCatalog instanceof JdbcExternalCatalog) {
return new UnboundJdbcTableSink<>(nameParts, colNames, hints,
partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/InsertPartitionSpec.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/InsertPartitionSpec.java
new file mode 100644
index 00000000000..6b9e7d7b24c
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/InsertPartitionSpec.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.parser;
+
+import org.apache.doris.nereids.trees.expressions.Expression;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Unified partition specification for INSERT statements.
+ * Supports multiple partition modes:
+ * 1. Auto-detect: PARTITION (*) - partitions = null
+ * 2. Dynamic partition by name: PARTITION (p1, p2) - partitionNames is
+ * non-empty
+ * 3. Static partition with values: PARTITION (col='val', ...) -
+ * staticPartitionValues is non-empty
+ * 4. No partition specified: all fields are empty/default
+ */
+public class InsertPartitionSpec {
+
+ /** Static partition key-value pairs: col_name -> value expression */
+ private final Map<String, Expression> staticPartitionValues;
+
+ /** Dynamic partition names (e.g., PARTITION (p1, p2)) */
+ private final List<String> partitionNames;
+
+ /** Whether it's a temporary partition */
+ private final boolean isTemporary;
+
+ /** Whether it's auto-detect mode (PARTITION (*)) */
+ private final boolean isAutoDetect;
+
+ private InsertPartitionSpec(Map<String, Expression> staticPartitionValues,
+ List<String> partitionNames, boolean isTemporary, boolean
isAutoDetect) {
+ this.staticPartitionValues = staticPartitionValues != null
+ ? ImmutableMap.copyOf(staticPartitionValues)
+ : ImmutableMap.of();
+ this.partitionNames = partitionNames != null
+ ? ImmutableList.copyOf(partitionNames)
+ : ImmutableList.of();
+ this.isTemporary = isTemporary;
+ this.isAutoDetect = isAutoDetect;
+ }
+
+ /** No partition specified */
+ public static InsertPartitionSpec none() {
+ return new InsertPartitionSpec(null, null, false, false);
+ }
+
+ /** Auto-detect partition: PARTITION (*) */
+ public static InsertPartitionSpec autoDetect(boolean isTemporary) {
+ return new InsertPartitionSpec(null, null, isTemporary, true);
+ }
+
+ /** Dynamic partition by name: PARTITION (p1, p2) */
+ public static InsertPartitionSpec dynamicPartition(List<String>
partitionNames, boolean isTemporary) {
+ return new InsertPartitionSpec(null, partitionNames, isTemporary,
false);
+ }
+
+ /** Static partition with values: PARTITION (col='val', ...) */
+ public static InsertPartitionSpec staticPartition(Map<String, Expression>
staticValues, boolean isTemporary) {
+ return new InsertPartitionSpec(staticValues, null, isTemporary, false);
+ }
+
+ /** Check if this is a static partition with key-value pairs */
+ public boolean isStaticPartition() {
+ return staticPartitionValues != null &&
!staticPartitionValues.isEmpty();
+ }
+
+ /** Check if this has dynamic partition names */
+ public boolean hasDynamicPartitionNames() {
+ return partitionNames != null && !partitionNames.isEmpty();
+ }
+
+ public Map<String, Expression> getStaticPartitionValues() {
+ return staticPartitionValues;
+ }
+
+ public List<String> getPartitionNames() {
+ return partitionNames;
+ }
+
+ public boolean isTemporary() {
+ return isTemporary;
+ }
+
+ public boolean isAutoDetect() {
+ return isAutoDetect;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index da410826cd5..427dd78a231 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -1347,27 +1347,26 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
branchName = Optional.of(ctx.optSpecBranch().name.getText());
}
List<String> colNames = ctx.cols == null ? ImmutableList.of() :
visitIdentifierList(ctx.cols);
- // TODO visit partitionSpecCtx
LogicalPlan plan = visitQuery(ctx.query());
- // partitionSpec may be NULL. means auto detect partition. only
available when IOT
- Pair<Boolean, List<String>> partitionSpec =
visitPartitionSpec(ctx.partitionSpec());
- // partitionSpec.second :
- // null - auto detect
- // zero - whole table
- // others - specific partitions
- boolean isAutoDetect = partitionSpec.second == null;
+
+ // Parse partition specification using unified method
+ InsertPartitionSpec partitionSpec =
parseInsertPartitionSpec(ctx.partitionSpec());
+
+ // Unified sink creation for all catalog types (including Iceberg
static
+ // partition)
LogicalSink<?> sink =
UnboundTableSinkCreator.createUnboundTableSinkMaybeOverwrite(
tableName.build(),
colNames,
- ImmutableList.of(), // hints
- partitionSpec.first, // isTemp
- partitionSpec.second, // partition names
- isAutoDetect,
+ ImmutableList.of(),
+ partitionSpec.isTemporary(),
+ partitionSpec.isAutoDetect() ? null :
partitionSpec.getPartitionNames(),
+ partitionSpec.isAutoDetect(),
isOverwrite,
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(),
ConnectContext.get().getSessionVariable().getPartialUpdateNewRowPolicy(),
ctx.tableId == null ? DMLCommandType.INSERT :
DMLCommandType.GROUP_COMMIT,
- plan);
+ plan,
+ partitionSpec.isStaticPartition() ?
partitionSpec.getStaticPartitionValues() : null);
Optional<LogicalPlan> cte = Optional.empty();
if (ctx.cte() != null) {
cte = Optional.ofNullable(withCte(plan, ctx.cte()));
@@ -1446,6 +1445,11 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
partitions = null;
} else if (ctx.partition != null) {
partitions = ImmutableList.of(ctx.partition.getText());
+ } else if (ctx.partitionKeyValue() != null &&
!ctx.partitionKeyValue().isEmpty()) {
+ // Static partition: PARTITION (col='val', ...)
+ throw new ParseException(
+ "Static partition syntax PARTITION (col='val', ...) is
only supported in INSERT statements",
+ ctx);
} else {
partitions = visitIdentifierList(ctx.partitions);
}
@@ -1453,6 +1457,52 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
return Pair.of(temporaryPartition, partitions);
}
+ /**
+ * Parse partition specification for INSERT statements.
+ * Returns a unified InsertPartitionSpec that handles all partition modes:
+ * - Auto-detect: PARTITION (*)
+ * - Dynamic partition by name: PARTITION (p1, p2)
+ * - Static partition with values: PARTITION (col='val', ...)
+ * - No partition specified
+ */
+ private InsertPartitionSpec parseInsertPartitionSpec(PartitionSpecContext
ctx) {
+ if (ctx == null) {
+ return InsertPartitionSpec.none();
+ }
+
+ boolean isTemporary = ctx.TEMPORARY() != null;
+
+ // PARTITION (*)
+ if (ctx.ASTERISK() != null) {
+ return InsertPartitionSpec.autoDetect(isTemporary);
+ }
+
+ // PARTITION partition_name (single partition)
+ if (ctx.partition != null) {
+ return InsertPartitionSpec.dynamicPartition(
+ ImmutableList.of(ctx.partition.getText()), isTemporary);
+ }
+
+ // PARTITION (col1='val1', col2='val2') - static partition
+ if (ctx.partitionKeyValue() != null &&
!ctx.partitionKeyValue().isEmpty()) {
+ Map<String, Expression> staticValues = Maps.newLinkedHashMap();
+ for (DorisParser.PartitionKeyValueContext kvCtx :
ctx.partitionKeyValue()) {
+ String colName = kvCtx.identifier().getText();
+ Expression valueExpr = typedVisit(kvCtx.expression());
+ staticValues.put(colName, valueExpr);
+ }
+ return InsertPartitionSpec.staticPartition(staticValues,
isTemporary);
+ }
+
+ // PARTITION (p1, p2, ...) - dynamic partition list
+ if (ctx.partitions != null) {
+ List<String> partitionNames = visitIdentifierList(ctx.partitions);
+ return InsertPartitionSpec.dynamicPartition(partitionNames,
isTemporary);
+ }
+
+ return InsertPartitionSpec.none();
+ }
+
@Override
public Command visitCreateMTMV(CreateMTMVContext ctx) {
if (ctx.buildMode() == null && ctx.refreshMethod() == null &&
ctx.refreshTrigger() == null
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
index df6dec2db43..8cf2e105816 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
@@ -102,12 +102,17 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -603,9 +608,27 @@ public class BindSink implements AnalysisRuleFactory {
IcebergExternalTable table = pair.second;
LogicalPlan child = ((LogicalPlan) sink.child());
+ // Get static partition columns if present
+ Map<String, Expression> staticPartitions =
sink.getStaticPartitionKeyValues();
+ Set<String> staticPartitionColNames = staticPartitions != null
+ ? staticPartitions.keySet()
+ : Sets.newHashSet();
+
+ // Validate static partition if present
+ if (sink.hasStaticPartition()) {
+ validateStaticPartition(sink, table);
+ }
+
+ // Build bindColumns: exclude static partition columns from the
columns that
+ // need to come from SELECT
+ // Because static partition column values come from PARTITION clause,
not from
+ // SELECT
List<Column> bindColumns;
if (sink.getColNames().isEmpty()) {
- bindColumns =
table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList());
+ // When no column names specified, include all
non-static-partition columns
+ bindColumns = table.getBaseSchema(true).stream()
+ .filter(col ->
!staticPartitionColNames.contains(col.getName()))
+ .collect(ImmutableList.toImmutableList());
} else {
bindColumns = sink.getColNames().stream().map(cn -> {
Column column = table.getColumn(cn);
@@ -616,6 +639,7 @@ public class BindSink implements AnalysisRuleFactory {
return column;
}).collect(ImmutableList.toImmutableList());
}
+
LogicalIcebergTableSink<?> boundSink = new LogicalIcebergTableSink<>(
database,
table,
@@ -627,16 +651,92 @@ public class BindSink implements AnalysisRuleFactory {
Optional.empty(),
Optional.empty(),
child);
- // we need to insert all the columns of the target table
+
+ // Check column count: SELECT columns should match bindColumns
(excluding static
+ // partition columns)
if (boundSink.getCols().size() != child.getOutput().size()) {
- throw new AnalysisException("insert into cols should be
corresponding to the query output");
+ throw new AnalysisException("insert into cols should be
corresponding to the query output. "
+ + "Expected " + boundSink.getCols().size() + " columns but
got " + child.getOutput().size());
}
+
Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx,
table, false,
boundSink, child);
+
+ // For static partition columns, add constant expressions from
PARTITION clause
+ // This ensures partition column values are written to the data file
+ if (!staticPartitionColNames.isEmpty()) {
+ for (Map.Entry<String, Expression> entry :
staticPartitions.entrySet()) {
+ String colName = entry.getKey();
+ Expression valueExpr = entry.getValue();
+ Column column = table.getColumn(colName);
+ if (column != null) {
+ // Cast the literal to the correct column type
+ Expression castExpr = TypeCoercionUtils.castIfNotSameType(
+ valueExpr,
DataType.fromCatalogType(column.getType()));
+ columnToOutput.put(colName, new Alias(castExpr, colName));
+ }
+ }
+ }
+
LogicalProject<?> fullOutputProject =
getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput);
return boundSink.withChildAndUpdateOutput(fullOutputProject);
}
+ /**
+ * Validate static partition specification for Iceberg table
+ */
+ private void validateStaticPartition(UnboundIcebergTableSink<?> sink,
IcebergExternalTable table) {
+ Map<String, Expression> staticPartitions =
sink.getStaticPartitionKeyValues();
+ if (staticPartitions == null || staticPartitions.isEmpty()) {
+ return;
+ }
+
+ Table icebergTable = table.getIcebergTable();
+ PartitionSpec partitionSpec = icebergTable.spec();
+
+ // Check if table is partitioned
+ if (!partitionSpec.isPartitioned()) {
+ throw new AnalysisException(
+ String.format("Table %s is not partitioned, cannot use
static partition syntax", table.getName()));
+ }
+
+ // Get partition field names
+ Map<String, PartitionField> partitionFieldMap = Maps.newHashMap();
+ for (PartitionField field : partitionSpec.fields()) {
+ String fieldName = field.name();
+ partitionFieldMap.put(fieldName, field);
+ }
+
+ // Validate each static partition column
+ for (Map.Entry<String, Expression> entry :
staticPartitions.entrySet()) {
+ String partitionColName = entry.getKey();
+ Expression partitionValue = entry.getValue();
+
+ // 1. Check if partition column exists
+ if (!partitionFieldMap.containsKey(partitionColName)) {
+ throw new AnalysisException(
+ String.format("Unknown partition column '%s' in table
'%s'. Available partition columns: %s",
+ partitionColName, table.getName(),
partitionFieldMap.keySet()));
+ }
+
+ // 2. Check if it's an identity partition.
+ // Static partition overwrite is only supported for identity
partitions.
+ PartitionField field = partitionFieldMap.get(partitionColName);
+ if (!field.transform().isIdentity()) {
+ throw new AnalysisException(
+ String.format("Cannot use static partition syntax for
non-identity partition field '%s'"
+ + " (transform: %s).", partitionColName,
field.transform().toString()));
+ }
+
+ // 3. Validate partition value type must be a literal
+ if (!(partitionValue instanceof Literal)) {
+ throw new AnalysisException(
+ String.format("Partition value for column '%s' must be
a literal, but got: %s",
+ partitionColName, partitionValue));
+ }
+ }
+ }
+
private Plan bindJdbcTableSink(MatchingContext<UnboundJdbcTableSink<Plan>>
ctx) {
UnboundJdbcTableSink<?> sink = ctx.root;
Pair<JdbcExternalDatabase, JdbcExternalTable> pair =
bind(ctx.cascadesContext, sink);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
index 6a921cee10c..45bcb5092c8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertCommandContext.java
@@ -17,6 +17,9 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
+import com.google.common.collect.Maps;
+
+import java.util.Map;
import java.util.Optional;
/**
@@ -24,6 +27,9 @@ import java.util.Optional;
*/
public class IcebergInsertCommandContext extends
BaseExternalTableInsertCommandContext {
private Optional<String> branchName = Optional.empty();
+ // Static partition key-value pairs for INSERT OVERWRITE ... PARTITION
+ // (col='val', ...)
+ private Map<String, String> staticPartitionValues = Maps.newHashMap();
public Optional<String> getBranchName() {
return branchName;
@@ -32,4 +38,21 @@ public class IcebergInsertCommandContext extends
BaseExternalTableInsertCommandC
public void setBranchName(Optional<String> branchName) {
this.branchName = branchName;
}
+
+ public Map<String, String> getStaticPartitionValues() {
+ return staticPartitionValues;
+ }
+
+ public void setStaticPartitionValues(Map<String, String>
staticPartitionValues) {
+ this.staticPartitionValues = staticPartitionValues != null
+ ? Maps.newHashMap(staticPartitionValues)
+ : Maps.newHashMap();
+ }
+
+ /**
+ * Check if this is a static partition overwrite
+ */
+ public boolean isStaticPartitionOverwrite() {
+ return isOverwrite() && !staticPartitionValues.isEmpty();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
index b8b20f61527..e0ba01277b8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
@@ -43,6 +43,8 @@ import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.TreeNode;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
@@ -65,6 +67,7 @@ import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -72,6 +75,7 @@ import org.awaitility.Awaitility;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -366,9 +370,11 @@ public class InsertOverwriteTableCommand extends Command
implements NeedAuditEnc
false,
TPartialUpdateNewRowPolicy.APPEND,
sink.getDMLCommandType(),
- (LogicalPlan) (sink.child(0)));
+ (LogicalPlan) (sink.child(0)),
+ sink.getStaticPartitionKeyValues());
insertCtx = new IcebergInsertCommandContext();
((IcebergInsertCommandContext) insertCtx).setOverwrite(true);
+ setStaticPartitionToContext(sink, (IcebergInsertCommandContext)
insertCtx);
branchName.ifPresent(notUsed -> ((IcebergInsertCommandContext)
insertCtx).setBranchName(branchName));
} else {
throw new UserException("Current catalog does not support insert
overwrite yet.");
@@ -394,16 +400,33 @@ public class InsertOverwriteTableCommand extends Command
implements NeedAuditEnc
} else if (logicalQuery instanceof UnboundHiveTableSink) {
insertCtx = new HiveInsertCommandContext();
((HiveInsertCommandContext) insertCtx).setOverwrite(true);
- } else if (logicalQuery instanceof UnboundIcebergTableSink) {
- insertCtx = new IcebergInsertCommandContext();
- ((IcebergInsertCommandContext) insertCtx).setOverwrite(true);
- branchName.ifPresent(notUsed -> ((IcebergInsertCommandContext)
insertCtx).setBranchName(branchName));
} else {
- throw new UserException("Current catalog does not support insert
overwrite yet.");
+ throw new UserException("Current catalog does not support insert
overwrite with auto-detect partition.");
}
runInsertCommand(logicalQuery, insertCtx, ctx, executor);
}
+ /**
+ * Extract static partition information from sink and set to context.
+ */
+ private void setStaticPartitionToContext(UnboundIcebergTableSink<?> sink,
+ IcebergInsertCommandContext insertCtx) {
+ if (sink.hasStaticPartition()) {
+ Map<String, Expression> staticPartitions =
sink.getStaticPartitionKeyValues();
+ Map<String, String> staticPartitionValues = Maps.newHashMap();
+ for (Map.Entry<String, Expression> entry :
staticPartitions.entrySet()) {
+ Expression expr = entry.getValue();
+ if (expr instanceof Literal) {
+ staticPartitionValues.put(entry.getKey(), ((Literal)
expr).getStringValue());
+ } else {
+ throw new AnalysisException(
+ String.format("Static partition value must be a
literal, but got: %s", expr));
+ }
+ }
+ insertCtx.setStaticPartitionValues(staticPartitionValues);
+ }
+ }
+
@Override
public Plan getExplainPlan(ConnectContext ctx) {
Optional<CascadesContext> analyzeContext = Optional.of(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index c536844c649..3968ada89ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -161,6 +161,14 @@ public class IcebergTableSink extends
BaseExternalTableDataSink {
if (insertCtx.isPresent()) {
IcebergInsertCommandContext context =
(IcebergInsertCommandContext) insertCtx.get();
tSink.setOverwrite(context.isOverwrite());
+
+ // Pass static partition values to BE for static partition
overwrite
+ if (context.isStaticPartitionOverwrite()) {
+ Map<String, String> staticPartitionValues =
context.getStaticPartitionValues();
+ if (staticPartitionValues != null &&
!staticPartitionValues.isEmpty()) {
+ tSink.setStaticPartitionValues(staticPartitionValues);
+ }
+ }
}
tDataSink = new TDataSink(TDataSinkType.ICEBERG_TABLE_SINK);
tDataSink.setIcebergTableSink(tSink);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/InsertPartitionSpecTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/InsertPartitionSpecTest.java
new file mode 100644
index 00000000000..a6f3402cade
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/InsertPartitionSpecTest.java
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.parser;
+
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Unit tests for {@link InsertPartitionSpec}.
+ */
+public class InsertPartitionSpecTest {
+
+ @Test
+ public void testNone() {
+ InsertPartitionSpec spec = InsertPartitionSpec.none();
+
+ Assertions.assertFalse(spec.isStaticPartition());
+ Assertions.assertFalse(spec.hasDynamicPartitionNames());
+ Assertions.assertFalse(spec.isTemporary());
+ Assertions.assertFalse(spec.isAutoDetect());
+ Assertions.assertTrue(spec.getStaticPartitionValues().isEmpty());
+ Assertions.assertTrue(spec.getPartitionNames().isEmpty());
+ }
+
+ @Test
+ public void testAutoDetect() {
+ // Non-temporary auto-detect
+ InsertPartitionSpec spec = InsertPartitionSpec.autoDetect(false);
+
+ Assertions.assertFalse(spec.isStaticPartition());
+ Assertions.assertFalse(spec.hasDynamicPartitionNames());
+ Assertions.assertFalse(spec.isTemporary());
+ Assertions.assertTrue(spec.isAutoDetect());
+ Assertions.assertTrue(spec.getStaticPartitionValues().isEmpty());
+ Assertions.assertTrue(spec.getPartitionNames().isEmpty());
+ }
+
+ @Test
+ public void testAutoDetectTemporary() {
+ // Temporary auto-detect
+ InsertPartitionSpec spec = InsertPartitionSpec.autoDetect(true);
+
+ Assertions.assertFalse(spec.isStaticPartition());
+ Assertions.assertFalse(spec.hasDynamicPartitionNames());
+ Assertions.assertTrue(spec.isTemporary());
+ Assertions.assertTrue(spec.isAutoDetect());
+ }
+
+ @Test
+ public void testDynamicPartition() {
+ List<String> partitionNames = ImmutableList.of("p1", "p2", "p3");
+ InsertPartitionSpec spec =
InsertPartitionSpec.dynamicPartition(partitionNames, false);
+
+ Assertions.assertFalse(spec.isStaticPartition());
+ Assertions.assertTrue(spec.hasDynamicPartitionNames());
+ Assertions.assertFalse(spec.isTemporary());
+ Assertions.assertFalse(spec.isAutoDetect());
+ Assertions.assertEquals(3, spec.getPartitionNames().size());
+ Assertions.assertEquals("p1", spec.getPartitionNames().get(0));
+ Assertions.assertEquals("p2", spec.getPartitionNames().get(1));
+ Assertions.assertEquals("p3", spec.getPartitionNames().get(2));
+ }
+
+ @Test
+ public void testDynamicPartitionTemporary() {
+ List<String> partitionNames = ImmutableList.of("p1");
+ InsertPartitionSpec spec =
InsertPartitionSpec.dynamicPartition(partitionNames, true);
+
+ Assertions.assertFalse(spec.isStaticPartition());
+ Assertions.assertTrue(spec.hasDynamicPartitionNames());
+ Assertions.assertTrue(spec.isTemporary());
+ Assertions.assertFalse(spec.isAutoDetect());
+ Assertions.assertEquals(1, spec.getPartitionNames().size());
+ }
+
+ @Test
+ public void testDynamicPartitionEmpty() {
+ List<String> partitionNames = ImmutableList.of();
+ InsertPartitionSpec spec =
InsertPartitionSpec.dynamicPartition(partitionNames, false);
+
+ Assertions.assertFalse(spec.isStaticPartition());
+ Assertions.assertFalse(spec.hasDynamicPartitionNames());
+ Assertions.assertFalse(spec.isTemporary());
+ Assertions.assertFalse(spec.isAutoDetect());
+ Assertions.assertTrue(spec.getPartitionNames().isEmpty());
+ }
+
+ @Test
+ public void testDynamicPartitionNull() {
+ InsertPartitionSpec spec = InsertPartitionSpec.dynamicPartition(null,
false);
+
+ Assertions.assertFalse(spec.isStaticPartition());
+ Assertions.assertFalse(spec.hasDynamicPartitionNames());
+ Assertions.assertTrue(spec.getPartitionNames().isEmpty());
+ }
+
+ @Test
+ public void testStaticPartition() {
+ Map<String, Expression> staticValues = ImmutableMap.of(
+ "dt", new StringLiteral("2025-01-25"),
+ "region", new StringLiteral("bj"));
+ InsertPartitionSpec spec =
InsertPartitionSpec.staticPartition(staticValues, false);
+
+ Assertions.assertTrue(spec.isStaticPartition());
+ Assertions.assertFalse(spec.hasDynamicPartitionNames());
+ Assertions.assertFalse(spec.isTemporary());
+ Assertions.assertFalse(spec.isAutoDetect());
+ Assertions.assertEquals(2, spec.getStaticPartitionValues().size());
+
Assertions.assertTrue(spec.getStaticPartitionValues().containsKey("dt"));
+
Assertions.assertTrue(spec.getStaticPartitionValues().containsKey("region"));
+ }
+
+ @Test
+ public void testStaticPartitionTemporary() {
+ Map<String, Expression> staticValues = ImmutableMap.of(
+ "year", new IntegerLiteral(2025));
+ InsertPartitionSpec spec =
InsertPartitionSpec.staticPartition(staticValues, true);
+
+ Assertions.assertTrue(spec.isStaticPartition());
+ Assertions.assertFalse(spec.hasDynamicPartitionNames());
+ Assertions.assertTrue(spec.isTemporary());
+ Assertions.assertFalse(spec.isAutoDetect());
+ Assertions.assertEquals(1, spec.getStaticPartitionValues().size());
+ }
+
+ @Test
+ public void testStaticPartitionEmpty() {
+ Map<String, Expression> staticValues = ImmutableMap.of();
+ InsertPartitionSpec spec =
InsertPartitionSpec.staticPartition(staticValues, false);
+
+ Assertions.assertFalse(spec.isStaticPartition());
+ Assertions.assertFalse(spec.hasDynamicPartitionNames());
+ Assertions.assertTrue(spec.getStaticPartitionValues().isEmpty());
+ }
+
+ @Test
+ public void testStaticPartitionNull() {
+ InsertPartitionSpec spec = InsertPartitionSpec.staticPartition(null,
false);
+
+ Assertions.assertFalse(spec.isStaticPartition());
+ Assertions.assertTrue(spec.getStaticPartitionValues().isEmpty());
+ }
+
+ @Test
+ public void testImmutability() {
+ // Test that returned collections are immutable
+ Map<String, Expression> staticValues = ImmutableMap.of(
+ "dt", new StringLiteral("2025-01-25"));
+ List<String> partitionNames = ImmutableList.of("p1");
+
+ InsertPartitionSpec staticSpec =
InsertPartitionSpec.staticPartition(staticValues, false);
+ InsertPartitionSpec dynamicSpec =
InsertPartitionSpec.dynamicPartition(partitionNames, false);
+
+ // Verify collections are immutable (should throw
UnsupportedOperationException)
+ Assertions.assertThrows(UnsupportedOperationException.class, () -> {
+ staticSpec.getStaticPartitionValues().put("new_key", new
StringLiteral("value"));
+ });
+
+ Assertions.assertThrows(UnsupportedOperationException.class, () -> {
+ dynamicSpec.getPartitionNames().add("p2");
+ });
+ }
+
+ @Test
+ public void testMutualExclusivity() {
+ // Static partition should not have dynamic partition names
+ Map<String, Expression> staticValues = ImmutableMap.of(
+ "dt", new StringLiteral("2025-01-25"));
+ InsertPartitionSpec staticSpec =
InsertPartitionSpec.staticPartition(staticValues, false);
+ Assertions.assertTrue(staticSpec.isStaticPartition());
+ Assertions.assertFalse(staticSpec.hasDynamicPartitionNames());
+ Assertions.assertFalse(staticSpec.isAutoDetect());
+
+ // Dynamic partition should not be static
+ List<String> partitionNames = ImmutableList.of("p1", "p2");
+ InsertPartitionSpec dynamicSpec =
InsertPartitionSpec.dynamicPartition(partitionNames, false);
+ Assertions.assertFalse(dynamicSpec.isStaticPartition());
+ Assertions.assertTrue(dynamicSpec.hasDynamicPartitionNames());
+ Assertions.assertFalse(dynamicSpec.isAutoDetect());
+
+ // Auto-detect should not be static or have dynamic names
+ InsertPartitionSpec autoSpec = InsertPartitionSpec.autoDetect(false);
+ Assertions.assertFalse(autoSpec.isStaticPartition());
+ Assertions.assertFalse(autoSpec.hasDynamicPartitionNames());
+ Assertions.assertTrue(autoSpec.isAutoDetect());
+
+ // None should have nothing
+ InsertPartitionSpec noneSpec = InsertPartitionSpec.none();
+ Assertions.assertFalse(noneSpec.isStaticPartition());
+ Assertions.assertFalse(noneSpec.hasDynamicPartitionNames());
+ Assertions.assertFalse(noneSpec.isAutoDetect());
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/ParseInsertPartitionSpecTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/ParseInsertPartitionSpecTest.java
new file mode 100644
index 00000000000..185141d6875
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/ParseInsertPartitionSpecTest.java
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.parser;
+
+import org.apache.doris.nereids.DorisParser;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.literal.StringLikeLiteral;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Maps;
+import mockit.Mock;
+import mockit.MockUp;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+
+/**
+ * Unit tests for LogicalPlanBuilder.parseInsertPartitionSpec() method.
+ */
+public class ParseInsertPartitionSpecTest {
+
+ private static LogicalPlanBuilder logicalPlanBuilder;
+
+ @BeforeAll
+ public static void init() {
+ ConnectContext ctx = new ConnectContext();
+ new MockUp<ConnectContext>() {
+ @Mock
+ public ConnectContext get() {
+ return ctx;
+ }
+ };
+ }
+
+ /**
+ * Helper method to invoke the private parseInsertPartitionSpec method via
+ * reflection.
+ * Uses reflection to find the method with PartitionSpecContext parameter.
+ */
+ private InsertPartitionSpec invokeParseInsertPartitionSpec(Object ctx)
throws Exception {
+ logicalPlanBuilder = new LogicalPlanBuilder(Maps.newHashMap());
+ // Find the parseInsertPartitionSpec method - it takes
PartitionSpecContext as
+ // parameter
+ Method targetMethod = null;
+ for (Method method : LogicalPlanBuilder.class.getDeclaredMethods()) {
+ if (method.getName().equals("parseInsertPartitionSpec")) {
+ targetMethod = method;
+ break;
+ }
+ }
+ if (targetMethod == null) {
+ throw new NoSuchMethodException("parseInsertPartitionSpec method
not found");
+ }
+ targetMethod.setAccessible(true);
+ return (InsertPartitionSpec) targetMethod.invoke(logicalPlanBuilder,
ctx);
+ }
+
+ /**
+ * Helper method to parse SQL and extract PartitionSpecContext using
reflection.
+ */
+ private Object parsePartitionSpec(String insertSql) throws Exception {
+ // Use NereidsParser.toAst() to parse the SQL and get the AST
+ ParserRuleContext tree = NereidsParser.toAst(
+ insertSql, DorisParser::singleStatement);
+
+ // The tree is a SingleStatementContext, which contains a
StatementContext
+ // which contains a StatementBaseContext which contains an
InsertTableContext
+ // Use reflection to navigate the AST structure
+ Method getChildMethod = ParserRuleContext.class.getMethod("getChild",
int.class);
+
+ // Get statement from singleStatement (index 0)
+ Object statement = getChildMethod.invoke(tree, 0);
+
+ // Get statementBase from statement (index 0)
+ Object statementBase = getChildMethod.invoke(statement, 0);
+
+ // Get insertTable from statementBase (index 0)
+ Object insertTableCtx = getChildMethod.invoke(statementBase, 0);
+
+ // Get partitionSpec() from insertTableCtx using the method
+ Method partitionSpecMethod =
insertTableCtx.getClass().getMethod("partitionSpec");
+ return partitionSpecMethod.invoke(insertTableCtx);
+ }
+
+ @Test
+ public void testParseNullContext() throws Exception {
+ InsertPartitionSpec spec = invokeParseInsertPartitionSpec(null);
+
+ Assertions.assertFalse(spec.isStaticPartition());
+ Assertions.assertFalse(spec.hasDynamicPartitionNames());
+ Assertions.assertFalse(spec.isTemporary());
+ Assertions.assertFalse(spec.isAutoDetect());
+ }
+
+ @Test
+ public void testParseAutoDetect() throws Exception {
+ // Parse: INSERT INTO tbl PARTITION (*) SELECT ...
+ String sql = "INSERT INTO tbl PARTITION (*) SELECT * FROM src";
+ Object ctx = parsePartitionSpec(sql);
+
+ InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+ Assertions.assertFalse(spec.isStaticPartition());
+ Assertions.assertFalse(spec.hasDynamicPartitionNames());
+ Assertions.assertFalse(spec.isTemporary());
+ Assertions.assertTrue(spec.isAutoDetect());
+ }
+
+ @Test
+ public void testParseDynamicPartitionSingle() throws Exception {
+ // Parse: INSERT INTO tbl PARTITION p1 SELECT ...
+ String sql = "INSERT INTO tbl PARTITION p1 SELECT * FROM src";
+ Object ctx = parsePartitionSpec(sql);
+
+ InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+ Assertions.assertFalse(spec.isStaticPartition());
+ Assertions.assertTrue(spec.hasDynamicPartitionNames());
+ Assertions.assertFalse(spec.isTemporary());
+ Assertions.assertFalse(spec.isAutoDetect());
+ Assertions.assertEquals(1, spec.getPartitionNames().size());
+ Assertions.assertEquals("p1", spec.getPartitionNames().get(0));
+ }
+
+ @Test
+ public void testParseDynamicPartitionList() throws Exception {
+ // Parse: INSERT INTO tbl PARTITION (p1, p2, p3) SELECT ...
+ String sql = "INSERT INTO tbl PARTITION (p1, p2, p3) SELECT * FROM
src";
+ Object ctx = parsePartitionSpec(sql);
+
+ InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+ Assertions.assertFalse(spec.isStaticPartition());
+ Assertions.assertTrue(spec.hasDynamicPartitionNames());
+ Assertions.assertFalse(spec.isTemporary());
+ Assertions.assertFalse(spec.isAutoDetect());
+ Assertions.assertEquals(3, spec.getPartitionNames().size());
+ Assertions.assertTrue(spec.getPartitionNames().contains("p1"));
+ Assertions.assertTrue(spec.getPartitionNames().contains("p2"));
+ Assertions.assertTrue(spec.getPartitionNames().contains("p3"));
+ }
+
+ @Test
+ public void testParseTemporaryPartition() throws Exception {
+ // Parse: INSERT INTO tbl TEMPORARY PARTITION (p1) SELECT ...
+ String sql = "INSERT INTO tbl TEMPORARY PARTITION (p1) SELECT * FROM
src";
+ Object ctx = parsePartitionSpec(sql);
+
+ InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+ Assertions.assertFalse(spec.isStaticPartition());
+ Assertions.assertTrue(spec.hasDynamicPartitionNames());
+ Assertions.assertTrue(spec.isTemporary());
+ Assertions.assertFalse(spec.isAutoDetect());
+ }
+
+ @Test
+ public void testParseStaticPartitionStringValue() throws Exception {
+ // Parse: INSERT OVERWRITE TABLE tbl PARTITION (dt='2025-01-25')
SELECT ...
+ String sql = "INSERT OVERWRITE TABLE tbl PARTITION (dt='2025-01-25')
SELECT * FROM src";
+ Object ctx = parsePartitionSpec(sql);
+
+ InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+ Assertions.assertTrue(spec.isStaticPartition());
+ Assertions.assertFalse(spec.hasDynamicPartitionNames());
+ Assertions.assertFalse(spec.isTemporary());
+ Assertions.assertFalse(spec.isAutoDetect());
+
+ Map<String, Expression> staticValues = spec.getStaticPartitionValues();
+ Assertions.assertEquals(1, staticValues.size());
+ Assertions.assertTrue(staticValues.containsKey("dt"));
+ Assertions.assertTrue(staticValues.get("dt") instanceof
StringLikeLiteral);
+ Assertions.assertEquals("2025-01-25", ((StringLikeLiteral)
staticValues.get("dt")).getStringValue());
+ }
+
+ @Test
+ public void testParseStaticPartitionMultipleValues() throws Exception {
+ // Parse: INSERT OVERWRITE TABLE tbl PARTITION (dt='2025-01-25',
region='bj') SELECT
+ // ...
+ String sql = "INSERT OVERWRITE TABLE tbl PARTITION (dt='2025-01-25',
region='bj') SELECT * FROM src";
+ Object ctx = parsePartitionSpec(sql);
+
+ InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+ Assertions.assertTrue(spec.isStaticPartition());
+ Assertions.assertFalse(spec.hasDynamicPartitionNames());
+
+ Map<String, Expression> staticValues = spec.getStaticPartitionValues();
+ Assertions.assertEquals(2, staticValues.size());
+ Assertions.assertTrue(staticValues.containsKey("dt"));
+ Assertions.assertTrue(staticValues.containsKey("region"));
+ Assertions.assertEquals("2025-01-25", ((StringLikeLiteral)
staticValues.get("dt")).getStringValue());
+ Assertions.assertEquals("bj", ((StringLikeLiteral)
staticValues.get("region")).getStringValue());
+ }
+
+ @Test
+ public void testParseStaticPartitionIntegerValue() throws Exception {
+ // Parse: INSERT OVERWRITE TABLE tbl PARTITION (year=2025) SELECT ...
+ String sql = "INSERT OVERWRITE TABLE tbl PARTITION (year=2025) SELECT
* FROM src";
+ Object ctx = parsePartitionSpec(sql);
+
+ InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+ Assertions.assertTrue(spec.isStaticPartition());
+
+ Map<String, Expression> staticValues = spec.getStaticPartitionValues();
+ Assertions.assertEquals(1, staticValues.size());
+ Assertions.assertTrue(staticValues.containsKey("year"));
+ // Note: The parser may parse integers as IntegerLikeLiteral
+ Expression yearExpr = staticValues.get("year");
+ Assertions.assertNotNull(yearExpr);
+ }
+
+ @Test
+ public void testParseNoPartition() throws Exception {
+ // Parse: INSERT INTO tbl SELECT ... (no partition clause)
+ String sql = "INSERT INTO tbl SELECT * FROM src";
+ Object ctx = parsePartitionSpec(sql);
+
+ InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+ // ctx should be null for no partition
+ Assertions.assertFalse(spec.isStaticPartition());
+ Assertions.assertFalse(spec.hasDynamicPartitionNames());
+ Assertions.assertFalse(spec.isTemporary());
+ Assertions.assertFalse(spec.isAutoDetect());
+ }
+
+ @Test
+ public void testParseStaticPartitionMixedTypes() throws Exception {
+ // Parse: INSERT OVERWRITE TABLE tbl PARTITION (year=2025, month='01',
day=25) SELECT
+ // ...
+ String sql = "INSERT OVERWRITE TABLE tbl PARTITION (year=2025,
month='01', day=25) SELECT * FROM src";
+ Object ctx = parsePartitionSpec(sql);
+
+ InsertPartitionSpec spec = invokeParseInsertPartitionSpec(ctx);
+
+ Assertions.assertTrue(spec.isStaticPartition());
+ Assertions.assertFalse(spec.hasDynamicPartitionNames());
+
+ Map<String, Expression> staticValues = spec.getStaticPartitionValues();
+ Assertions.assertEquals(3, staticValues.size());
+ Assertions.assertTrue(staticValues.containsKey("year"));
+ Assertions.assertTrue(staticValues.containsKey("month"));
+ Assertions.assertTrue(staticValues.containsKey("day"));
+
+ // month should be string
+ Assertions.assertTrue(staticValues.get("month") instanceof
StringLikeLiteral);
+ Assertions.assertEquals("01", ((StringLikeLiteral)
staticValues.get("month")).getStringValue());
+ }
+}
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index d218db5dba5..f151c58917b 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -426,6 +426,10 @@ struct TIcebergTableSink {
12: optional string original_output_path
13: optional PlanNodes.TFileCompressType compression_type
14: optional list<Types.TNetworkAddress> broker_addresses;
+ // Static partition values for static partition overwrite
+ // Key: partition column name, Value: partition value as string
+ // When set, BE should use these values directly instead of computing from
data
+ 15: optional map<string, string> static_partition_values;
}
enum TDictLayoutType {
diff --git
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_static_partition_overwrite.out
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_static_partition_overwrite.out
new file mode 100644
index 00000000000..58cd1fa5ca4
--- /dev/null
+++
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_static_partition_overwrite.out
@@ -0,0 +1,135 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !q01 --
+1 Alice 2025-01-25 bj
+2 Bob 2025-01-25 sh
+3 Charlie 2025-01-26 bj
+4 David 2025-01-26 sh
+
+-- !q02 --
+10 Eve 2025-01-25 bj
+2 Bob 2025-01-25 sh
+3 Charlie 2025-01-26 bj
+4 David 2025-01-26 sh
+
+-- !q03_before --
+1 Alice 2025-01-25 bj
+2 Bob 2025-01-25 sh
+3 Charlie 2025-01-26 bj
+4 David 2025-01-26 sh
+
+-- !q03_after --
+10 Eve 2025-01-25 bj
+11 Frank 2025-01-25 sh
+12 Grace 2025-01-25 gz
+3 Charlie 2025-01-26 bj
+4 David 2025-01-26 sh
+
+-- !q03_partition_25 --
+10 Eve 2025-01-25 bj
+11 Frank 2025-01-25 sh
+12 Grace 2025-01-25 gz
+
+-- !q03_partition_26 --
+3 Charlie 2025-01-26 bj
+4 David 2025-01-26 sh
+
+-- !q04 --
+2 Bob 2025-01-25 sh
+3 Charlie 2025-01-26 bj
+
+-- !q05 --
+10 Eve 2025-01-25 bj 100
+2 Bob 2025-01-25 bj 200
+3 Charlie 2025-01-25 sh 100
+4 David 2025-01-26 bj 100
+
+-- !q06_before --
+1 Alice 2025-01-25 bj food
+2 Bob 2025-01-25 bj drink
+3 Charlie 2025-01-25 sh food
+4 David 2025-01-26 bj food
+
+-- !q06_after --
+10 Eve 2025-01-25 bj electronics
+11 Frank 2025-01-25 bj clothing
+3 Charlie 2025-01-25 sh food
+4 David 2025-01-26 bj food
+
+-- !q06_static_partition --
+10 Eve 2025-01-25 bj electronics
+11 Frank 2025-01-25 bj clothing
+
+-- !q06_other_partitions --
+3 Charlie 2025-01-25 sh food
+4 David 2025-01-26 bj food
+
+-- !q07 --
+10 Eve 1706140800000
+2 Bob 1706227200000
+3 Charlie 1706313600000
+
+-- !q08 --
+10 Eve 1706140800000 bj
+11 Frank 1706140800000 sh
+3 Charlie 1706227200000 bj
+
+-- !q09 --
+10 Eve 85.5
+2 Bob 90.0
+3 Charlie 75.5
+
+-- !q10 --
+10 Eve 85.5 1
+11 Frank 85.5 2
+3 Charlie 90.0 1
+
+-- !q11 --
+10 Eve 99.98999999999999
+2 Bob 199.99
+3 Charlie 299.99
+
+-- !q12 --
+10 Eve 99.98999999999999 A
+11 Frank 99.98999999999999 B
+3 Charlie 199.99 A
+
+-- !q13 --
+10 Eve true
+2 Bob false
+
+-- !q14 --
+10 Eve true 1
+11 Frank true 2
+3 Charlie false 1
+
+-- !q15 --
+10 Eve 2025-01-25T10:00
+2 Bob 2025-01-25T11:00
+3 Charlie 2025-01-26T10:00
+
+-- !q16 --
+10 Eve 2025-01-25T10:00 bj
+11 Frank 2025-01-25T10:00 sh
+3 Charlie 2025-01-26T10:00 bj
+
+-- !q17 --
+10 Eve 100.50
+2 Bob 200.75
+3 Charlie 300.25
+
+-- !q18 --
+10 Eve 100.50 10
+11 Frank 100.50 20
+3 Charlie 200.75 10
+
+-- !q19 --
+10 Eve 1 85.5 true A
+2 Bob 1 85.5 true B
+3 Charlie 2 90.0 false A
+4 David 1 85.5 false A
+
+-- !q20 --
+10 Eve 1 85.5 true A
+11 Frank 1 85.5 false B
+4 David 2 90.0 true A
+
diff --git
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_static_partition_overwrite.groovy
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_static_partition_overwrite.groovy
new file mode 100644
index 00000000000..d79093b69bf
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_static_partition_overwrite.groovy
@@ -0,0 +1,1086 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_static_partition_overwrite",
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+ String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String catalog_name = "test_iceberg_static_partition_overwrite"
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ sql """ switch ${catalog_name} """
+
+ String db1 = catalog_name + "_db"
+ String tb1 = db1 + "_tb1"
+
+ sql """ drop database if exists ${db1} force"""
+ sql """ create database ${db1} """
+ sql """ use ${db1} """
+
+ // Test Case 1: Full static partition overwrite (all partition columns
specified)
+ // Test overwriting a specific partition with all partition columns
specified
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ dt DATE,
+ region STRING
+ ) ENGINE=iceberg
+ PARTITION BY LIST (dt, region) ()
+ """
+
+ // Insert initial data
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', '2025-01-25', 'bj'),
+ (2, 'Bob', '2025-01-25', 'sh'),
+ (3, 'Charlie', '2025-01-26', 'bj'),
+ (4, 'David', '2025-01-26', 'sh')
+ """
+
+ // Verify initial data
+ order_qt_q01 """ SELECT * FROM ${tb1} ORDER BY id """
+
+ // Overwrite specific partition (dt='2025-01-25', region='bj')
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (dt='2025-01-25', region='bj')
+ SELECT 10, 'Eve'
+ """
+
+ // Verify: Only (dt='2025-01-25', region='bj') partition is overwritten
+ // Other partitions remain unchanged
+ order_qt_q02 """ SELECT * FROM ${tb1} ORDER BY id """
+
+ // Test Case 2: Hybrid partition mode - partial static + partial dynamic
+ // Static partition column (dt) comes from PARTITION clause
+ // Dynamic partition column (region) comes from SELECT query result
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ dt DATE,
+ region STRING
+ ) ENGINE=iceberg
+ PARTITION BY LIST (dt, region) ()
+ """
+
+ // Create source table for hybrid partition test
+ String tb_src = db1 + "_src"
+ sql """ DROP TABLE IF EXISTS ${tb_src} """
+ sql """
+ CREATE TABLE ${tb_src} (
+ id BIGINT,
+ name STRING,
+ region STRING
+ ) ENGINE=iceberg
+ """
+
+ // Insert source data with different regions
+ sql """
+ INSERT INTO ${tb_src} VALUES
+ (10, 'Eve', 'bj'),
+ (11, 'Frank', 'sh'),
+ (12, 'Grace', 'gz')
+ """
+
+ // Insert initial data to target table
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', '2025-01-25', 'bj'),
+ (2, 'Bob', '2025-01-25', 'sh'),
+ (3, 'Charlie', '2025-01-26', 'bj'),
+ (4, 'David', '2025-01-26', 'sh')
+ """
+
+ // Verify initial data
+ order_qt_q03_before """ SELECT * FROM ${tb1} ORDER BY id """
+
+ // Hybrid mode: dt='2025-01-25' is static, region comes from source table
dynamically
+ // This should:
+ // 1. Delete all data where dt='2025-01-25'
+ // 2. Insert new data with dt='2025-01-25' and region values from source
table
+ // Note: SELECT does NOT include 'dt' column - it comes from PARTITION
clause
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (dt='2025-01-25')
+ SELECT id, name, region FROM ${tb_src}
+ """
+
+ // Verify:
+ // - Partitions with dt='2025-01-25' are replaced with new data (bj, sh,
gz regions)
+ // - Partitions with dt='2025-01-26' remain unchanged
+ order_qt_q03_after """ SELECT * FROM ${tb1} ORDER BY id """
+
+ // Verify partition data distribution
+ order_qt_q03_partition_25 """ SELECT * FROM ${tb1} WHERE dt='2025-01-25'
ORDER BY id """
+ order_qt_q03_partition_26 """ SELECT * FROM ${tb1} WHERE dt='2025-01-26'
ORDER BY id """
+
+ sql """ DROP TABLE IF EXISTS ${tb_src} """
+
+ // Test Case 3: Empty result overwrite (delete specified partition)
+ // Test deleting a partition by overwriting with empty result
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ dt DATE,
+ region STRING
+ ) ENGINE=iceberg
+ PARTITION BY LIST (dt, region) ()
+ """
+
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', '2025-01-25', 'bj'),
+ (2, 'Bob', '2025-01-25', 'sh'),
+ (3, 'Charlie', '2025-01-26', 'bj')
+ """
+
+ // Overwrite with empty result to delete the specified partition
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (dt='2025-01-25', region='bj')
+ SELECT id, name FROM ${tb1} WHERE 1=0
+ """
+
+ // Verify: Specified partition is deleted, other partitions remain
unchanged
+ order_qt_q04 """ SELECT * FROM ${tb1} ORDER BY id """
+
+ // Test Case 4: Error scenario - non-existent partition column
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ dt DATE,
+ region STRING
+ ) ENGINE=iceberg
+ PARTITION BY LIST (dt, region) ()
+ """
+
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (invalid_col='value')
+ SELECT * FROM ${tb1}
+ """
+ exception "Unknown partition column"
+ }
+
+ // Test Case 5: Multiple static partitions with different data types (full
static)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ dt DATE,
+ region STRING,
+ amount INT
+ ) ENGINE=iceberg
+ PARTITION BY LIST (dt, region, amount) ()
+ """
+
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', '2025-01-25', 'bj', 100),
+ (2, 'Bob', '2025-01-25', 'bj', 200),
+ (3, 'Charlie', '2025-01-25', 'sh', 100),
+ (4, 'David', '2025-01-26', 'bj', 100)
+ """
+
+ // Overwrite partition with multiple partition columns including integer
type
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (dt='2025-01-25', region='bj', amount=100)
+ SELECT 10, 'Eve'
+ """
+
+ // Verify: Only the exact matching partition is overwritten
+ order_qt_q05 """ SELECT * FROM ${tb1} ORDER BY id """
+
+ // Test Case 6: Hybrid mode with three partition columns (2 static + 1
dynamic)
+ // Table has partition columns: (dt, region, category)
+ // Static: dt='2025-01-25', region='bj'
+ // Dynamic: category comes from SELECT
+ String tb2 = db1 + "_tb2"
+ sql """ DROP TABLE IF EXISTS ${tb2} """
+ sql """
+ CREATE TABLE ${tb2} (
+ id BIGINT,
+ name STRING,
+ dt DATE,
+ region STRING,
+ category STRING
+ ) ENGINE=iceberg
+ PARTITION BY LIST (dt, region, category) ()
+ """
+
+ // Insert initial data with different categories
+ sql """
+ INSERT INTO ${tb2} VALUES
+ (1, 'Alice', '2025-01-25', 'bj', 'food'),
+ (2, 'Bob', '2025-01-25', 'bj', 'drink'),
+ (3, 'Charlie', '2025-01-25', 'sh', 'food'),
+ (4, 'David', '2025-01-26', 'bj', 'food')
+ """
+
+ order_qt_q06_before """ SELECT * FROM ${tb2} ORDER BY id """
+
+ // Create source table for dynamic category values
+ String tb2_src = db1 + "_tb2_src"
+ sql """ DROP TABLE IF EXISTS ${tb2_src} """
+ sql """
+ CREATE TABLE ${tb2_src} (
+ id BIGINT,
+ name STRING,
+ category STRING
+ ) ENGINE=iceberg
+ """
+
+ sql """
+ INSERT INTO ${tb2_src} VALUES
+ (10, 'Eve', 'electronics'),
+ (11, 'Frank', 'clothing')
+ """
+
+ // Hybrid mode: dt and region are static, category is dynamic from source
+ // SELECT should only include: id, name, category (not dt, region)
+ sql """
+ INSERT OVERWRITE TABLE ${tb2}
+ PARTITION (dt='2025-01-25', region='bj')
+ SELECT id, name, category FROM ${tb2_src}
+ """
+
+ // Verify:
+ // - All partitions with dt='2025-01-25' AND region='bj' are replaced
+ // - Other partitions remain unchanged
+ order_qt_q06_after """ SELECT * FROM ${tb2} ORDER BY id """
+
+ // Verify specific partition filters
+ order_qt_q06_static_partition """ SELECT * FROM ${tb2} WHERE
dt='2025-01-25' AND region='bj' ORDER BY id """
+ order_qt_q06_other_partitions """ SELECT * FROM ${tb2} WHERE NOT
(dt='2025-01-25' AND region='bj') ORDER BY id """
+
+ sql """ DROP TABLE IF EXISTS ${tb2_src} """
+ sql """ DROP TABLE IF EXISTS ${tb2} """
+
+ // Test Case 7: Static partition with LONG type
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ timestamp_val BIGINT
+ ) ENGINE=iceberg
+ PARTITION BY LIST (timestamp_val) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 1706140800000),
+ (2, 'Bob', 1706227200000),
+ (3, 'Charlie', 1706313600000)
+ """
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (timestamp_val=1706140800000)
+ SELECT 10, 'Eve'
+ """
+ order_qt_q07 """ SELECT * FROM ${tb1} ORDER BY id """
+
+ // Test Case 8: Hybrid mode with LONG type (static) + STRING (dynamic)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ timestamp_val BIGINT,
+ region STRING
+ ) ENGINE=iceberg
+ PARTITION BY LIST (timestamp_val, region) ()
+ """
+ String tb_long_src = db1 + "_long_src"
+ sql """ DROP TABLE IF EXISTS ${tb_long_src} """
+ sql """
+ CREATE TABLE ${tb_long_src} (
+ id BIGINT,
+ name STRING,
+ region STRING
+ ) ENGINE=iceberg
+ """
+ sql """
+ INSERT INTO ${tb_long_src} VALUES
+ (10, 'Eve', 'bj'),
+ (11, 'Frank', 'sh')
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 1706140800000, 'bj'),
+ (2, 'Bob', 1706140800000, 'sh'),
+ (3, 'Charlie', 1706227200000, 'bj')
+ """
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (timestamp_val=1706140800000)
+ SELECT id, name, region FROM ${tb_long_src}
+ """
+ order_qt_q08 """ SELECT * FROM ${tb1} ORDER BY id """
+ sql """ DROP TABLE IF EXISTS ${tb_long_src} """
+
+ // Test Case 9: Static partition with FLOAT type
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ score FLOAT
+ ) ENGINE=iceberg
+ PARTITION BY LIST (score) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 85.5),
+ (2, 'Bob', 90.0),
+ (3, 'Charlie', 75.5)
+ """
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (score=85.5)
+ SELECT 10, 'Eve'
+ """
+ order_qt_q09 """ SELECT * FROM ${tb1} ORDER BY id """
+
+ // Test Case 10: Hybrid mode with FLOAT type (static) + INTEGER (dynamic)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ score FLOAT,
+ level INT
+ ) ENGINE=iceberg
+ PARTITION BY LIST (score, level) ()
+ """
+ String tb_float_src = db1 + "_float_src"
+ sql """ DROP TABLE IF EXISTS ${tb_float_src} """
+ sql """
+ CREATE TABLE ${tb_float_src} (
+ id BIGINT,
+ name STRING,
+ level INT
+ ) ENGINE=iceberg
+ """
+ sql """
+ INSERT INTO ${tb_float_src} VALUES
+ (10, 'Eve', 1),
+ (11, 'Frank', 2)
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 85.5, 1),
+ (2, 'Bob', 85.5, 2),
+ (3, 'Charlie', 90.0, 1)
+ """
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (score=85.5)
+ SELECT id, name, level FROM ${tb_float_src}
+ """
+ order_qt_q10 """ SELECT * FROM ${tb1} ORDER BY id """
+ sql """ DROP TABLE IF EXISTS ${tb_float_src} """
+
+ // Test Case 11: Static partition with DOUBLE type
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ price DOUBLE
+ ) ENGINE=iceberg
+ PARTITION BY LIST (price) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 99.99),
+ (2, 'Bob', 199.99),
+ (3, 'Charlie', 299.99)
+ """
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (price=99.99)
+ SELECT 10, 'Eve'
+ """
+ order_qt_q11 """ SELECT * FROM ${tb1} ORDER BY id """
+
+ // Test Case 12: Hybrid mode with DOUBLE type (static) + STRING (dynamic)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ price DOUBLE,
+ category STRING
+ ) ENGINE=iceberg
+ PARTITION BY LIST (price, category) ()
+ """
+ String tb_double_src = db1 + "_double_src"
+ sql """ DROP TABLE IF EXISTS ${tb_double_src} """
+ sql """
+ CREATE TABLE ${tb_double_src} (
+ id BIGINT,
+ name STRING,
+ category STRING
+ ) ENGINE=iceberg
+ """
+ sql """
+ INSERT INTO ${tb_double_src} VALUES
+ (10, 'Eve', 'A'),
+ (11, 'Frank', 'B')
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 99.99, 'A'),
+ (2, 'Bob', 99.99, 'B'),
+ (3, 'Charlie', 199.99, 'A')
+ """
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (price=99.99)
+ SELECT id, name, category FROM ${tb_double_src}
+ """
+ order_qt_q12 """ SELECT * FROM ${tb1} ORDER BY id """
+ sql """ DROP TABLE IF EXISTS ${tb_double_src} """
+
+ // Test Case 13: Static partition with BOOLEAN type
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ is_active BOOLEAN
+ ) ENGINE=iceberg
+ PARTITION BY LIST (is_active) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', true),
+ (2, 'Bob', false),
+ (3, 'Charlie', true)
+ """
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (is_active=true)
+ SELECT 10, 'Eve'
+ """
+ order_qt_q13 """ SELECT * FROM ${tb1} ORDER BY id """
+
+ // Test Case 14: Hybrid mode with BOOLEAN type (static) + INTEGER (dynamic)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ is_active BOOLEAN,
+ status INT
+ ) ENGINE=iceberg
+ PARTITION BY LIST (is_active, status) ()
+ """
+ String tb_bool_src = db1 + "_bool_src"
+ sql """ DROP TABLE IF EXISTS ${tb_bool_src} """
+ sql """
+ CREATE TABLE ${tb_bool_src} (
+ id BIGINT,
+ name STRING,
+ status INT
+ ) ENGINE=iceberg
+ """
+ sql """
+ INSERT INTO ${tb_bool_src} VALUES
+ (10, 'Eve', 1),
+ (11, 'Frank', 2)
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', true, 1),
+ (2, 'Bob', true, 2),
+ (3, 'Charlie', false, 1)
+ """
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (is_active=true)
+ SELECT id, name, status FROM ${tb_bool_src}
+ """
+ order_qt_q14 """ SELECT * FROM ${tb1} ORDER BY id """
+ sql """ DROP TABLE IF EXISTS ${tb_bool_src} """
+
+ // Test Case 15: Static partition with DATETIME type
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ ts DATETIME
+ ) ENGINE=iceberg
+ PARTITION BY LIST (ts) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', '2025-01-25 10:00:00'),
+ (2, 'Bob', '2025-01-25 11:00:00'),
+ (3, 'Charlie', '2025-01-26 10:00:00')
+ """
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (ts='2025-01-25 10:00:00')
+ SELECT 10, 'Eve'
+ """
+ order_qt_q15 """ SELECT * FROM ${tb1} ORDER BY id """
+
+ // Test Case 16: Hybrid mode with DATETIME type (static) + STRING (dynamic)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ ts DATETIME,
+ region STRING
+ ) ENGINE=iceberg
+ PARTITION BY LIST (ts, region) ()
+ """
+ String tb_ts_src = db1 + "_ts_src"
+ sql """ DROP TABLE IF EXISTS ${tb_ts_src} """
+ sql """
+ CREATE TABLE ${tb_ts_src} (
+ id BIGINT,
+ name STRING,
+ region STRING
+ ) ENGINE=iceberg
+ """
+ sql """
+ INSERT INTO ${tb_ts_src} VALUES
+ (10, 'Eve', 'bj'),
+ (11, 'Frank', 'sh')
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', '2025-01-25 10:00:00', 'bj'),
+ (2, 'Bob', '2025-01-25 10:00:00', 'sh'),
+ (3, 'Charlie', '2025-01-26 10:00:00', 'bj')
+ """
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (ts='2025-01-25 10:00:00')
+ SELECT id, name, region FROM ${tb_ts_src}
+ """
+ order_qt_q16 """ SELECT * FROM ${tb1} ORDER BY id """
+ sql """ DROP TABLE IF EXISTS ${tb_ts_src} """
+
+ // Test Case 17: Static partition with DECIMAL type
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ amount DECIMAL(10,2)
+ ) ENGINE=iceberg
+ PARTITION BY LIST (amount) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 100.50),
+ (2, 'Bob', 200.75),
+ (3, 'Charlie', 300.25)
+ """
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (amount=100.50)
+ SELECT 10, 'Eve'
+ """
+ order_qt_q17 """ SELECT * FROM ${tb1} ORDER BY id """
+
+ // Test Case 18: Hybrid mode with DECIMAL type (static) + INTEGER (dynamic)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ amount DECIMAL(10,2),
+ quantity INT
+ ) ENGINE=iceberg
+ PARTITION BY LIST (amount, quantity) ()
+ """
+ String tb_decimal_src = db1 + "_decimal_src"
+ sql """ DROP TABLE IF EXISTS ${tb_decimal_src} """
+ sql """
+ CREATE TABLE ${tb_decimal_src} (
+ id BIGINT,
+ name STRING,
+ quantity INT
+ ) ENGINE=iceberg
+ """
+ sql """
+ INSERT INTO ${tb_decimal_src} VALUES
+ (10, 'Eve', 10),
+ (11, 'Frank', 20)
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 100.50, 10),
+ (2, 'Bob', 100.50, 20),
+ (3, 'Charlie', 200.75, 10)
+ """
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (amount=100.50)
+ SELECT id, name, quantity FROM ${tb_decimal_src}
+ """
+ order_qt_q18 """ SELECT * FROM ${tb1} ORDER BY id """
+ sql """ DROP TABLE IF EXISTS ${tb_decimal_src} """
+
+ // Test Case 19: Multiple types in static partition (INTEGER, FLOAT,
BOOLEAN, STRING)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ level INT,
+ score FLOAT,
+ is_active BOOLEAN,
+ category STRING
+ ) ENGINE=iceberg
+ PARTITION BY LIST (level, score, is_active, category) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 1, 85.5, true, 'A'),
+ (2, 'Bob', 1, 85.5, true, 'B'),
+ (3, 'Charlie', 2, 90.0, false, 'A'),
+ (4, 'David', 1, 85.5, false, 'A')
+ """
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (level=1, score=85.5, is_active=true, category='A')
+ SELECT 10, 'Eve'
+ """
+ order_qt_q19 """ SELECT * FROM ${tb1} ORDER BY id """
+
+ // Test Case 20: Hybrid mode with multiple types (2 static + 2 dynamic)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ level INT,
+ score FLOAT,
+ is_active BOOLEAN,
+ category STRING
+ ) ENGINE=iceberg
+ PARTITION BY LIST (level, score, is_active, category) ()
+ """
+ String tb_multi_src = db1 + "_multi_src"
+ sql """ DROP TABLE IF EXISTS ${tb_multi_src} """
+ sql """
+ CREATE TABLE ${tb_multi_src} (
+ id BIGINT,
+ name STRING,
+ is_active BOOLEAN,
+ category STRING
+ ) ENGINE=iceberg
+ """
+ sql """
+ INSERT INTO ${tb_multi_src} VALUES
+ (10, 'Eve', true, 'A'),
+ (11, 'Frank', false, 'B')
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 1, 85.5, true, 'A'),
+ (2, 'Bob', 1, 85.5, true, 'B'),
+ (3, 'Charlie', 1, 85.5, false, 'A'),
+ (4, 'David', 2, 90.0, true, 'A')
+ """
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (level=1, score=85.5)
+ SELECT id, name, is_active, category FROM ${tb_multi_src}
+ """
+ order_qt_q20 """ SELECT * FROM ${tb1} ORDER BY id """
+ sql """ DROP TABLE IF EXISTS ${tb_multi_src} """
+
+ //
============================================================================
+ // Test Cases for non-identity partition transforms - static partition
overwrite should fail
+ //
============================================================================
+
+ // Test Case 21: Error scenario - bucket partition (non-identity transform)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ category STRING
+ ) ENGINE=iceberg
+ PARTITION BY LIST (bucket(4, category)) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 'food'),
+ (2, 'Bob', 'drink')
+ """
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (category='food')
+ SELECT 10, 'Eve'
+ """
+ exception "Unknown partition column"
+ }
+ // Using correct partition field name should trigger non-identity error
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (category_bucket=0)
+ SELECT 10, 'Eve'
+ """
+ exception "Cannot use static partition syntax for non-identity
partition field"
+ }
+
+ // Test Case 22: Error scenario - truncate partition (non-identity
transform)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ description STRING
+ ) ENGINE=iceberg
+ PARTITION BY LIST (truncate(3, description)) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 'hello world'),
+ (2, 'Bob', 'goodbye')
+ """
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (description='hello')
+ SELECT 10, 'Eve'
+ """
+ exception "Unknown partition column"
+ }
+ // Using correct partition field name should trigger non-identity error
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (description_trunc='hel')
+ SELECT 10, 'Eve'
+ """
+ exception "Cannot use static partition syntax for non-identity
partition field"
+ }
+
+ // Test Case 23: Error scenario - day partition (non-identity time
transform)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ ts DATETIME
+ ) ENGINE=iceberg
+ PARTITION BY LIST (day(ts)) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', '2025-01-25 10:00:00'),
+ (2, 'Bob', '2025-01-26 11:00:00')
+ """
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (ts='2025-01-25 10:00:00')
+ SELECT 10, 'Eve'
+ """
+ exception "Unknown partition column"
+ }
+ // Using correct partition field name should trigger non-identity error
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (ts_day='2025-01-25')
+ SELECT 10, 'Eve'
+ """
+ exception "Cannot use static partition syntax for non-identity
partition field"
+ }
+
+ // Test Case 24: Error scenario - year partition (non-identity time
transform)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ event_date DATE
+ ) ENGINE=iceberg
+ PARTITION BY LIST (year(event_date)) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', '2024-06-15'),
+ (2, 'Bob', '2025-01-25')
+ """
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (event_date='2024-06-15')
+ SELECT 10, 'Eve'
+ """
+ exception "Unknown partition column"
+ }
+ // Using correct partition field name should trigger non-identity error
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (event_date_year=2024)
+ SELECT 10, 'Eve'
+ """
+ exception "Cannot use static partition syntax for non-identity
partition field"
+ }
+
+ // Test Case 25: Error scenario - month partition (non-identity time
transform)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ event_date DATE
+ ) ENGINE=iceberg
+ PARTITION BY LIST (month(event_date)) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', '2025-01-15'),
+ (2, 'Bob', '2025-02-20')
+ """
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (event_date='2025-01-15')
+ SELECT 10, 'Eve'
+ """
+ exception "Unknown partition column"
+ }
+ // Using correct partition field name should trigger non-identity error
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (event_date_month='2025-01')
+ SELECT 10, 'Eve'
+ """
+ exception "Cannot use static partition syntax for non-identity
partition field"
+ }
+
+ // Test Case 26: Error scenario - hour partition (non-identity time
transform)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ ts DATETIME
+ ) ENGINE=iceberg
+ PARTITION BY LIST (hour(ts)) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', '2025-01-25 10:30:00'),
+ (2, 'Bob', '2025-01-25 11:45:00')
+ """
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (ts='2025-01-25 10:00:00')
+ SELECT 10, 'Eve'
+ """
+ exception "Unknown partition column"
+ }
+ // Using correct partition field name should trigger non-identity error
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (ts_hour='2025-01-25-10')
+ SELECT 10, 'Eve'
+ """
+ exception "Cannot use static partition syntax for non-identity
partition field"
+ }
+
+ // Test Case 27: Error scenario - mixed identity and non-identity
partitions (bucket)
+ // Table has identity partition (region) + non-identity partition (bucket
on id)
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ region STRING
+ ) ENGINE=iceberg
+ PARTITION BY LIST (region, bucket(4, id)) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 'bj'),
+ (2, 'Bob', 'sh')
+ """
+ // Static partition on non-identity column should fail
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (id=1)
+ SELECT 'Eve', 'bj'
+ """
+ exception "Unknown partition column"
+ }
+ // Using correct partition field name should trigger non-identity error
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (id_bucket=0)
+ SELECT 'Eve', 'bj'
+ """
+ exception "Cannot use static partition syntax for non-identity
partition field"
+ }
+
+ // Test Case 28: Error scenario - truncate on integer column
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ amount INT
+ ) ENGINE=iceberg
+ PARTITION BY LIST (truncate(100, amount)) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 150),
+ (2, 'Bob', 250)
+ """
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (amount=100)
+ SELECT 10, 'Eve'
+ """
+ exception "Unknown partition column"
+ }
+ // Using correct partition field name should trigger non-identity error
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (amount_trunc=100)
+ SELECT 10, 'Eve'
+ """
+ exception "Cannot use static partition syntax for non-identity
partition field"
+ }
+
+ // Test Case 29: Error scenario - bucket on BIGINT column
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ user_id BIGINT
+ ) ENGINE=iceberg
+ PARTITION BY LIST (bucket(8, user_id)) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 1001),
+ (2, 'Bob', 2002)
+ """
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (user_id=1001)
+ SELECT 10, 'Eve'
+ """
+ exception "Unknown partition column"
+ }
+ // Using correct partition field name should trigger non-identity error
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (user_id_bucket=0)
+ SELECT 10, 'Eve'
+ """
+ exception "Cannot use static partition syntax for non-identity
partition field"
+ }
+
+ // Test Case 30: Mixed partitions - identity column is OK, but
non-identity should fail
+ // Test that specifying only identity partition columns works,
+ // but including non-identity columns fails
+ sql """ DROP TABLE IF EXISTS ${tb1} """
+ sql """
+ CREATE TABLE ${tb1} (
+ id BIGINT,
+ name STRING,
+ region STRING,
+ ts DATETIME
+ ) ENGINE=iceberg
+ PARTITION BY LIST (region, day(ts)) ()
+ """
+ sql """
+ INSERT INTO ${tb1} VALUES
+ (1, 'Alice', 'bj', '2025-01-25 10:00:00'),
+ (2, 'Bob', 'sh', '2025-01-26 11:00:00')
+ """
+ // Specifying only identity partition column (region) - this should work
normally
+ // But we need to also select ts column dynamically since day(ts) is a
partition
+ // Note: This is a tricky case - with partial static partition, the
non-specified
+ // partition columns should come from SELECT. But ts column must be in the
query result.
+ // For simplicity, test only the error case where non-identity column is
specified
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (ts='2025-01-25 10:00:00')
+ SELECT 10, 'Eve', 'bj'
+ """
+ exception "Unknown partition column"
+ }
+ // Using correct partition field name should trigger non-identity error
+ test {
+ sql """
+ INSERT OVERWRITE TABLE ${tb1}
+ PARTITION (ts_day='2025-01-25')
+ SELECT 10, 'Eve', 'bj'
+ """
+ exception "Cannot use static partition syntax for non-identity
partition field"
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]