Copilot commented on code in PR #58396:
URL: https://github.com/apache/doris/pull/58396#discussion_r2570899819


##########
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:
##########
@@ -174,9 +276,19 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
         Block transformed_block;
         SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
         transformed_block.reserve(_iceberg_partition_columns.size());
-        for (auto& iceberg_partition_columns : _iceberg_partition_columns) {
-            
transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply(
-                    output_block, iceberg_partition_columns.source_idx()));
+        for (int i = 0; i < _iceberg_partition_columns.size(); ++i) {
+            auto& iceberg_partition_columns = _iceberg_partition_columns[i];
+            if (_has_static_partition && _partition_column_is_static[i]) {
+                auto result_type =
+                        
iceberg_partition_columns.partition_column_transform().get_result_type();
+                auto col = 
result_type->create_column_const_with_default_value(output_block.rows());

Review Comment:
   In hybrid mode (lines 281-291), when a partition column is static, a 
constant column with default values is created (line 284). However, the actual 
static partition value is never used to populate this column - it just uses the 
default value. This means the `transformed_block` contains incorrect data for 
static partition columns. While this might not affect the final output because 
`_partition_to_path` and `_partition_values` use 
`_partition_column_static_values` directly (lines 435-437, 466-468), having 
incorrect data in the intermediate transformed_block is confusing and could 
lead to bugs if the code is modified later. The constant column should be 
populated with the actual static partition value from 
`_partition_column_static_values[i]` instead of using default values.
   ```suggestion
                   // Create a column with the actual static partition value
                   auto data_col = result_type->create_column();
                   result_type->get_default_serializer()->deserialize_column(
                       {_partition_column_static_values[i].data(), 
_partition_column_static_values[i].size()},
                       *data_col, 1);
                   auto col = ColumnConst::create(std::move(data_col), 
output_block.rows());
   ```



##########
regression-test/data/external_table_p0/iceberg/write/test_iceberg_static_partition_overwrite.out:
##########
@@ -0,0 +1,104 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q01 --
+1      Alice   2025-01-25      bj
+2      Bob     2025-01-25      sh
+3      Charlie 2025-01-26      bj
+4      David   2025-01-26      sh
+
+-- !q02 --
+10     Eve     2025-01-25      bj
+2      Bob     2025-01-25      sh
+3      Charlie 2025-01-26      bj
+4      David   2025-01-26      sh
+
+-- !q03_before --
+1      Alice   2025-01-25      bj
+2      Bob     2025-01-25      sh
+3      Charlie 2025-01-26      bj
+4      David   2025-01-26      sh
+
+-- !q03_after --
+10     Eve     2025-01-25      bj
+11     Frank   2025-01-25      sh
+12     Grace   2025-01-25      gz
+3      Charlie 2025-01-26      bj
+4      David   2025-01-26      sh
+
+-- !q03_partition_25 --
+10     Eve     2025-01-25      bj
+11     Frank   2025-01-25      sh
+12     Grace   2025-01-25      gz
+
+-- !q03_partition_26 --
+3      Charlie 2025-01-26      bj
+4      David   2025-01-26      sh
+
+-- !q04 --
+2      Bob     2025-01-25      sh
+3      Charlie 2025-01-26      bj
+
+-- !q05 --
+10     Eve     2025-01-25      bj      100
+2      Bob     2025-01-25      bj      200
+3      Charlie 2025-01-25      sh      100
+4      David   2025-01-26      bj      100
+
+-- !q06_before --
+1      Alice   2025-01-25      bj      food
+2      Bob     2025-01-25      bj      drink
+3      Charlie 2025-01-25      sh      food
+4      David   2025-01-26      bj      food
+
+-- !q06_after --
+10     Eve     2025-01-25      bj      electronics
+11     Frank   2025-01-25      bj      clothing
+3      Charlie 2025-01-25      sh      food
+4      David   2025-01-26      bj      food
+
+-- !q06_static_partition --
+10     Eve     2025-01-25      bj      electronics
+11     Frank   2025-01-25      bj      clothing
+
+-- !q06_other_partitions --
+3      Charlie 2025-01-25      sh      food
+4      David   2025-01-26      bj      food
+
+-- !q07 --
+10     Eve     1706140800000
+2      Bob     1706227200000
+3      Charlie 1706313600000
+
+-- !q08 --
+10     Eve     1706140800000   bj
+11     Frank   1706140800000   sh
+3      Charlie 1706227200000   bj
+
+-- !q09 --
+10     Eve     85.5
+2      Bob     90.0
+3      Charlie 75.5
+
+-- !q10 --
+10     Eve     85.5    1
+11     Frank   85.5    2
+3      Charlie 90.0    1
+
+-- !q11 --
+10     Eve     99.98999999999999
+2      Bob     199.99
+3      Charlie 299.99
+
+-- !q12 --
+10     Eve     99.98999999999999       A
+11     Frank   99.98999999999999       B
+3      Charlie 199.99  A
+
+-- !q13 --
+10     Eve     true
+2      Bob     false
+
+-- !q14 --
+10     Eve     true    1
+11     Frank   true    2
+3      Charlie false   1
+

Review Comment:
   The test file is missing expected output for queries q15-q20. The test 
contains Test Cases 15-20 (lines 549-741) which execute various queries (q15, 
q16, q17, q18, q19, q20), but the corresponding output file only contains 
results up to q14. This means these test cases will fail when executed.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java:
##########
@@ -797,6 +797,72 @@ public static Literal<?> parseIcebergLiteral(String value, 
org.apache.iceberg.ty
         }
     }
 
+    /**
+     * Convert human-readable partition value string to appropriate Java type 
for
+     * Iceberg expression.
+     * This is used for static partition overwrite where user specifies 
partition
+     * values like PARTITION (dt='2025-01-01', region='bj').
+     *
+     * @param valueStr    Partition value as human-readable string (e.g.,
+     *                    "2025-01-01" for date)
+     * @param icebergType Iceberg type of the partition field
+     * @return Converted value object suitable for Iceberg Expression, or null 
if
+     *         value is null
+     */
+    public static Object parsePartitionValueFromString(String valueStr, 
org.apache.iceberg.types.Type icebergType) {
+        if (valueStr == null) {
+            return null;
+        }
+
+        try {
+            switch (icebergType.typeId()) {
+                case STRING:
+                    return valueStr;
+                case INTEGER:
+                    return Integer.parseInt(valueStr);
+                case LONG:
+                    return Long.parseLong(valueStr);
+                case FLOAT:
+                    return Float.parseFloat(valueStr);
+                case DOUBLE:
+                    return Double.parseDouble(valueStr);
+                case BOOLEAN:
+                    return Boolean.parseBoolean(valueStr);
+                case DATE:
+                    // Parse date string (format: yyyy-MM-dd) to epoch day
+                    return (int) LocalDate.parse(valueStr, 
DateTimeFormatter.ISO_LOCAL_DATE).toEpochDay();
+                case TIMESTAMP:
+                    // Parse timestamp string (format: yyyy-MM-dd HH:mm:ss or 
ISO format) to
+                    // microseconds
+                    return parseTimestampToMicros(valueStr, (TimestampType) 
icebergType);
+                case DECIMAL:
+                    return new BigDecimal(valueStr);
+                default:
+                    throw new IllegalArgumentException("Unsupported partition 
value type: " + icebergType);
+            }
+        } catch (Exception e) {
+            throw new IllegalArgumentException(String.format("Failed to 
convert partition value '%s' to type %s",
+                    valueStr, icebergType), e);
+        }
+    }
+
+    private static long parseTimestampToMicros(String valueStr, TimestampType 
timestampType) {
+        LocalDateTime ldt;
+        try {
+            ldt = LocalDateTime.parse(valueStr, 
DateTimeFormatter.ISO_LOCAL_DATE_TIME);
+        } catch (Exception e) {
+            // Try alternative format: yyyy-MM-dd HH:mm:ss
+            ldt = LocalDateTime.parse(valueStr, 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+        }
+
+        // Convert to microseconds
+        if (timestampType.shouldAdjustToUTC()) {
+            return 
ldt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() * 1000;
+        } else {
+            return ldt.toInstant(ZoneOffset.UTC).toEpochMilli() * 1000;
+        }

Review Comment:
   The timestamp parsing logic has potential issues with timezone handling. On 
line 855, when the ISO format parsing fails, it falls back to parsing 
"yyyy-MM-dd HH:mm:ss" format. However, the exception from line 852 is caught 
and ignored without logging. If both formats fail, line 855 will throw an 
exception that propagates up. More importantly, the timezone handling on lines 
859-862 may be incorrect:
   - When `shouldAdjustToUTC()` is true, it converts using system default 
timezone (line 860)
   - When false, it assumes the input is already in UTC (line 862)
   
   This is confusing and could lead to incorrect timestamp values. The logic 
should be clarified or documented better to explain when each branch should be 
used.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java:
##########
@@ -605,16 +629,99 @@ private Plan 
bindIcebergTableSink(MatchingContext<UnboundIcebergTableSink<Plan>>
                 Optional.empty(),
                 Optional.empty(),
                 child);
-        // we need to insert all the columns of the target table
+
+        // Check column count: SELECT columns should match bindColumns 
(excluding static
+        // partition columns)
         if (boundSink.getCols().size() != child.getOutput().size()) {
-            throw new AnalysisException("insert into cols should be 
corresponding to the query output");
+            throw new AnalysisException("insert into cols should be 
corresponding to the query output. "
+                    + "Expected " + boundSink.getCols().size() + " columns but 
got " + child.getOutput().size());
         }
+
         Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx, 
table, false,
                 boundSink, child);
+
+        // For static partition columns, add constant expressions from 
PARTITION clause
+        // This ensures partition column values are written to the data file
+        if (!staticPartitionColNames.isEmpty()) {
+            for (Map.Entry<String, Expression> entry : 
staticPartitions.entrySet()) {
+                String colName = entry.getKey();
+                Expression valueExpr = entry.getValue();
+                Column column = table.getColumn(colName);
+                if (column != null) {
+                    // Cast the literal to the correct column type
+                    Expression castExpr = TypeCoercionUtils.castIfNotSameType(
+                            valueExpr, 
DataType.fromCatalogType(column.getType()));
+                    columnToOutput.put(colName, new Alias(castExpr, colName));
+                }
+            }
+        }
+
         LogicalProject<?> fullOutputProject = 
getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput);
         return boundSink.withChildAndUpdateOutput(fullOutputProject);
     }
 
+    /**
+     * Validate static partition specification for Iceberg table
+     */
+    private void validateStaticPartition(UnboundIcebergTableSink<?> sink, 
IcebergExternalTable table) {
+        Map<String, Expression> staticPartitions = 
sink.getStaticPartitionKeyValues();
+        if (staticPartitions == null || staticPartitions.isEmpty()) {
+            return;
+        }
+
+        Table icebergTable = table.getIcebergTable();
+        PartitionSpec partitionSpec = icebergTable.spec();
+
+        // Check if table is partitioned
+        if (!partitionSpec.isPartitioned()) {
+            throw new AnalysisException(
+                    String.format("Table %s is not partitioned, cannot use 
static partition syntax", table.getName()));
+        }
+
+        // Get partition field names
+        Map<String, PartitionField> partitionFieldMap = Maps.newHashMap();
+        for (PartitionField field : partitionSpec.fields()) {
+            String fieldName = field.name();
+            partitionFieldMap.put(fieldName, field);
+        }
+
+        // Validate each static partition column
+        for (Map.Entry<String, Expression> entry : 
staticPartitions.entrySet()) {
+            String partitionColName = entry.getKey();
+            Expression partitionValue = entry.getValue();
+
+            // 1. Check if partition column exists
+            if (!partitionFieldMap.containsKey(partitionColName)) {
+                throw new AnalysisException(
+                        String.format("Unknown partition column '%s' in table 
'%s'. Available partition columns: %s",
+                                partitionColName, table.getName(), 
partitionFieldMap.keySet()));
+            }
+
+            // 2. Check if it's an identity partition.
+            // Static partition overwrite is only supported for identity 
partitions.
+            PartitionField field = partitionFieldMap.get(partitionColName);
+            if (!field.transform().isIdentity()) {
+                throw new AnalysisException(
+                        String.format("Cannot use static partition syntax for 
non-identity partition field '%s'"
+                                + " (transform: %s).", partitionColName, 
field.transform().toString()));
+            }
+
+            // 3. Check if partition value is a constant expression
+            if (!partitionValue.isConstant()) {
+                throw new AnalysisException(
+                        String.format("Partition value for column '%s' must be 
a constant expression, but got: %s",
+                                partitionColName, partitionValue));
+            }
+
+            // 4. Validate partition value type
+            if (!(partitionValue instanceof Literal)) {
+                throw new AnalysisException(
+                        String.format("Partition value for column '%s' must be 
a literal, but got: %s",
+                                partitionColName, partitionValue));
+            }

Review Comment:
   [nitpick] The validation checks if `partitionValue.isConstant()` on line 
710, and then checks if it's an instance of `Literal` on line 717. These checks 
are redundant - if something is a `Literal`, it's already constant. The 
`isConstant()` check could be removed, or the comment should explain why both 
checks are needed if there's a subtle distinction.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java:
##########
@@ -1434,13 +1433,64 @@ public Pair<Boolean, List<String>> 
visitPartitionSpec(PartitionSpecContext ctx)
                 partitions = null;
             } else if (ctx.partition != null) {
                 partitions = ImmutableList.of(ctx.partition.getText());
+            } else if (ctx.partitionKeyValue() != null && 
!ctx.partitionKeyValue().isEmpty()) {
+                // Static partition: PARTITION (col1='val1', col2='val2')
+                // For backward compatibility with callers expecting 
Pair<Boolean, List<String>>,
+                // return empty list here. Use parseInsertPartitionSpec() for 
full support.
+                partitions = ImmutableList.of();

Review Comment:
   [nitpick] When static partition syntax is detected (lines 1436-1440), the 
method returns an empty list for backward compatibility. However, this is not 
clearly documented and could cause confusion for callers of this method who 
don't expect an empty list for static partitions. Consider adding a comment 
explaining that static partition handling requires using 
`parseInsertPartitionSpec()` instead, or update the method documentation.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java:
##########
@@ -394,16 +400,33 @@ private void insertIntoAutoDetect(ConnectContext ctx, 
StmtExecutor executor, lon
         } else if (logicalQuery instanceof UnboundHiveTableSink) {
             insertCtx = new HiveInsertCommandContext();
             ((HiveInsertCommandContext) insertCtx).setOverwrite(true);
-        } else if (logicalQuery instanceof UnboundIcebergTableSink) {
-            insertCtx = new IcebergInsertCommandContext();
-            ((IcebergInsertCommandContext) insertCtx).setOverwrite(true);
-            branchName.ifPresent(notUsed -> ((IcebergInsertCommandContext) 
insertCtx).setBranchName(branchName));
         } else {
-            throw new UserException("Current catalog does not support insert 
overwrite yet.");
+            throw new UserException("Current catalog does not support insert 
overwrite with auto-detect partition.");

Review Comment:
   The removed Iceberg support in `insertIntoAutoDetect` appears to be a 
regression. The method previously supported `UnboundIcebergTableSink` but now 
throws an exception for all catalog types except `UnboundTableSink` and 
`UnboundHiveTableSink`. This breaks auto-detect functionality for Iceberg 
tables. If this removal is intentional (e.g., Iceberg doesn't support 
auto-detect), it should be clearly documented or the error message should be 
more specific about why Iceberg is excluded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to