This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 35741bfc6da branch-3.1: [fix](iceberg) Fix partition value conversion 
for Iceberg data file generation #58603 (#58907)
35741bfc6da is described below

commit 35741bfc6daf86936db21b6a5043dafed1041887
Author: Socrates <[email protected]>
AuthorDate: Mon Dec 15 11:18:02 2025 +0800

    branch-3.1: [fix](iceberg) Fix partition value conversion for Iceberg data 
file generation #58603 (#58907)
    
    bp: #58603
---
 .../doris/datasource/iceberg/IcebergUtils.java     | 93 ++++++++++++++++++++++
 .../iceberg/helper/IcebergWriterHelper.java        | 54 +++++++++++--
 .../datasource/iceberg/IcebergTransactionTest.java |  4 +
 .../write/test_iceberg_write_partitions.out        | 25 +++++-
 .../write/test_iceberg_write_partitions.groovy     | 49 ++++++++++++
 5 files changed, 219 insertions(+), 6 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index 698469db93b..268b73642e3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -62,6 +62,8 @@ import org.apache.doris.datasource.mvcc.MvccSnapshot;
 import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.datasource.property.metastore.HMSBaseProperties;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
+import org.apache.doris.nereids.trees.expressions.literal.Result;
+import org.apache.doris.nereids.util.DateUtils;
 import org.apache.doris.thrift.TExprOpcode;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
@@ -124,6 +126,8 @@ import java.time.Month;
 import java.time.ZoneId;
 import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoField;
+import java.time.temporal.TemporalAccessor;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -801,6 +805,95 @@ public class IcebergUtils {
         }
     }
 
+    /**
+     * Convert human-readable partition value string to appropriate Java type 
for
+     * Iceberg expression.
+     * This is used for static partition overwrite where user specifies 
partition
+     * values like PARTITION (dt='2025-01-01', region='bj').
+     *
+     * @param valueStr    Partition value as human-readable string (e.g.,
+     *                    "2025-01-01" for date)
+     * @param icebergType Iceberg type of the partition field
+     * @return Converted value object suitable for Iceberg Expression, or null 
if
+     *         value is null
+     */
+    public static Object parsePartitionValueFromString(String valueStr, 
org.apache.iceberg.types.Type icebergType) {
+        if (valueStr == null) {
+            return null;
+        }
+
+        try {
+            switch (icebergType.typeId()) {
+                case STRING:
+                    return valueStr;
+                case INTEGER:
+                    return Integer.parseInt(valueStr);
+                case LONG:
+                    return Long.parseLong(valueStr);
+                case FLOAT:
+                    return Float.parseFloat(valueStr);
+                case DOUBLE:
+                    return Double.parseDouble(valueStr);
+                case BOOLEAN:
+                    return Boolean.parseBoolean(valueStr);
+                case DATE:
+                    // Parse date string (format: yyyy-MM-dd) to epoch day
+                    return (int) LocalDate.parse(valueStr, 
DateTimeFormatter.ISO_LOCAL_DATE).toEpochDay();
+                case TIMESTAMP:
+                    return parseTimestampToMicros(valueStr, (TimestampType) 
icebergType);
+                case DECIMAL:
+                    return new BigDecimal(valueStr);
+                default:
+                    throw new IllegalArgumentException("Unsupported partition 
value type: " + icebergType);
+            }
+        } catch (Exception e) {
+            throw new IllegalArgumentException(String.format("Failed to 
convert partition value '%s' to type %s",
+                    valueStr, icebergType), e);
+        }
+    }
+
+    /**
+     * Parse timestamp string to microseconds using Doris's built-in datetime
+     * parser.
+     *
+     * @param valueStr      Timestamp string in various formats
+     * @param timestampType Iceberg timestamp type (with or without timezone)
+     * @return Microseconds since epoch
+     * @throws IllegalArgumentException if the timestamp string cannot be 
parsed
+     */
+    private static long parseTimestampToMicros(String valueStr, TimestampType 
timestampType) {
+        // Use Doris's built-in DateLiteral.parseDateTime() which supports 
multiple formats
+        Result<TemporalAccessor, ? extends Exception> parseResult =
+                
org.apache.doris.nereids.trees.expressions.literal.DateLiteral.parseDateTime(valueStr);
+
+        if (parseResult.isError()) {
+            throw new IllegalArgumentException(
+                    String.format("Failed to parse timestamp string '%s'", 
valueStr));
+        }
+
+        TemporalAccessor temporal = parseResult.get();
+
+        // Build LocalDateTime from TemporalAccessor using DateUtils helper 
methods
+        LocalDateTime ldt = LocalDateTime.of(
+                DateUtils.getOrDefault(temporal, ChronoField.YEAR),
+                DateUtils.getOrDefault(temporal, ChronoField.MONTH_OF_YEAR),
+                DateUtils.getOrDefault(temporal, ChronoField.DAY_OF_MONTH),
+                DateUtils.getHourOrDefault(temporal),
+                DateUtils.getOrDefault(temporal, ChronoField.MINUTE_OF_HOUR),
+                DateUtils.getOrDefault(temporal, ChronoField.SECOND_OF_MINUTE),
+                DateUtils.getOrDefault(temporal, ChronoField.NANO_OF_SECOND));
+
+        // Convert to microseconds
+        ZoneId zone = timestampType.shouldAdjustToUTC()
+                ? DateUtils.getTimeZone()
+                : ZoneId.of("UTC");
+
+        long epochSecond = ldt.atZone(zone).toInstant().getEpochSecond();
+        long microSecond = DateUtils.getOrDefault(temporal, 
ChronoField.NANO_OF_SECOND) / 1_000L;
+
+        return epochSecond * 1_000_000L + microSecond;
+    }
+
     private static void updateIcebergColumnUniqueId(Column column, 
Types.NestedField icebergField) {
         column.setUniqueId(icebergField.fieldId());
         List<NestedField> icebergFields = Lists.newArrayList();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
index 4171a0536f9..d429e95e8ce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.iceberg.helper;
 
+import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.datasource.statistics.CommonStatistics;
 import org.apache.doris.thrift.TIcebergCommitData;
 
@@ -24,8 +25,10 @@ import com.google.common.base.VerifyException;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionData;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.types.Types;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -51,7 +54,7 @@ public class IcebergWriterHelper {
             long recordCount = commitData.getRowCount();
             CommonStatistics stat = new CommonStatistics(recordCount, 
DEFAULT_FILE_COUNT, fileSize);
 
-            Optional<List<String>> partValues = Optional.empty();
+            Optional<PartitionData> partitionData = Optional.empty();
             //get and check partitionValues when table is partitionedTable
             if (spec.isPartitioned()) {
                 List<String> partitionValues = commitData.getPartitionValues();
@@ -60,9 +63,11 @@ public class IcebergWriterHelper {
                 }
                 partitionValues = partitionValues.stream().map(s -> 
s.equals("null") ? null : s)
                         .collect(Collectors.toList());
-                partValues = Optional.of(partitionValues);
+
+                // Convert human-readable partition values to PartitionData
+                partitionData = 
Optional.of(convertToPartitionData(partitionValues, spec));
             }
-            DataFile dataFile = genDataFile(format, location, spec, 
partValues, stat);
+            DataFile dataFile = genDataFile(format, location, spec, 
partitionData, stat);
             dataFiles.add(dataFile);
         }
         return WriteResult.builder()
@@ -75,7 +80,7 @@ public class IcebergWriterHelper {
             FileFormat format,
             String location,
             PartitionSpec spec,
-            Optional<List<String>> partValues,
+            Optional<PartitionData> partitionData,
             CommonStatistics statistics) {
 
         DataFiles.Builder builder = DataFiles.builder(spec)
@@ -84,8 +89,47 @@ public class IcebergWriterHelper {
                 .withRecordCount(statistics.getRowCount())
                 .withFormat(format);
 
-        partValues.ifPresent(builder::withPartitionValues);
+        partitionData.ifPresent(builder::withPartition);
 
         return builder.build();
     }
+
+    /**
+     * Convert human-readable partition values (from Backend) to PartitionData.
+     *
+     * Backend sends partition values as human-readable strings:
+     * - DATE: "2025-01-25"
+     * - DATETIME: "2025-01-25 10:00:00"
+     */
+    private static PartitionData convertToPartitionData(
+            List<String> humanReadableValues, PartitionSpec spec) {
+        // Create PartitionData instance using the partition type from spec
+        PartitionData partitionData = new PartitionData(spec.partitionType());
+
+        // Get partition type fields to determine the result type of each 
partition field
+        Types.StructType partitionType = spec.partitionType();
+        List<Types.NestedField> partitionTypeFields = partitionType.fields();
+
+        for (int i = 0; i < humanReadableValues.size(); i++) {
+            String humanReadableValue = humanReadableValues.get(i);
+
+            if (humanReadableValue == null) {
+                partitionData.set(i, null);
+                continue;
+            }
+
+            // Get the partition field's result type
+            Types.NestedField partitionTypeField = partitionTypeFields.get(i);
+            org.apache.iceberg.types.Type partitionFieldType = 
partitionTypeField.type();
+
+            // Convert the human-readable value to internal format object
+            Object internalValue = IcebergUtils.parsePartitionValueFromString(
+                    humanReadableValue, partitionFieldType);
+
+            // Set the value in PartitionData
+            partitionData.set(i, internalValue);
+        }
+
+        return partitionData;
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
index d4d7419a556..1a6e5ee9890 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
@@ -190,6 +190,10 @@ public class IcebergTransactionTest {
         try (MockedStatic<IcebergUtils> mockedStatic = 
Mockito.mockStatic(IcebergUtils.class)) {
             mockedStatic.when(() -> 
IcebergUtils.getIcebergTable(ArgumentMatchers.any(ExternalTable.class)))
                     .thenReturn(table);
+            // Allow parsePartitionValueFromString to call the real 
implementation
+            mockedStatic.when(() -> IcebergUtils.parsePartitionValueFromString(
+                    ArgumentMatchers.any(), ArgumentMatchers.any()))
+                    .thenCallRealMethod();
             IcebergTransaction txn = getTxn();
             txn.updateIcebergCommitData(ctdList);
             txn.beginInsert(icebergExternalTable, Optional.empty());
diff --git 
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_partitions.out
 
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_partitions.out
index c101079be33..d5320e4ac5f 100644
--- 
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_partitions.out
+++ 
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_partitions.out
@@ -35,6 +35,12 @@ true 3       2147483647      9223372036854775807     123.45  
123456.789
 123456.789012  3       \N      2024-03-20
 123456.789012  3       string_value    2024-03-20
 
+-- !datetime_partition01 --
+1      2025-01-01T10:00
+
+-- !datetime_partition02 --
+1      2025-01-01T10:00
+
 -- !columns_out_of_order01 --
 3      6       1       4       2       5
 
@@ -77,6 +83,12 @@ true 3       2147483647      9223372036854775807     123.45  
123456.789
 123456.789012  3       \N      2024-03-20
 123456.789012  3       string_value    2024-03-20
 
+-- !datetime_partition01 --
+1      2025-01-01T10:00
+
+-- !datetime_partition02 --
+1      2025-01-01T10:00
+
 -- !columns_out_of_order01 --
 3      6       1       4       2       5
 
@@ -119,6 +131,12 @@ true       3       2147483647      9223372036854775807     
123.45  123456.789
 123456.789012  3       \N      2024-03-20
 123456.789012  3       string_value    2024-03-20
 
+-- !datetime_partition01 --
+1      2025-01-01T10:00
+
+-- !datetime_partition02 --
+1      2025-01-01T10:00
+
 -- !columns_out_of_order01 --
 3      6       1       4       2       5
 
@@ -161,9 +179,14 @@ true       3       2147483647      9223372036854775807     
123.45  123456.789
 123456.789012  3       \N      2024-03-20
 123456.789012  3       string_value    2024-03-20
 
+-- !datetime_partition01 --
+1      2025-01-01T10:00
+
+-- !datetime_partition02 --
+1      2025-01-01T10:00
+
 -- !columns_out_of_order01 --
 3      6       1       4       2       5
 
 -- !columns_out_of_order02 --
 1      2       3       4       5       6
-
diff --git 
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_partitions.groovy
 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_partitions.groovy
index aa8589eddf3..94b04a164b1 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_partitions.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_partitions.groovy
@@ -120,6 +120,54 @@ suite("test_iceberg_write_partitions", 
"p0,external,iceberg,external_docker,exte
         sql """ DROP TABLE iceberg_all_partition_types2_${format_compression}; 
"""
     }
 
+    def test_datetime_partition = { String format_compression, String 
catalog_name ->
+        def parts = format_compression.split("_")
+        def format = parts[0]
+        def compression = parts[1]
+        sql """ drop table if exists 
datetime_partition_source_tbl_${format_compression} """
+        sql """
+            CREATE TABLE datetime_partition_source_tbl_${format_compression} (
+                `id` bigint,
+                `ts` datetime
+            ) ENGINE = iceberg
+            PARTITION BY LIST (ts) ()
+            properties (
+                "compression-codec" = ${compression},
+                "write-format"=${format}
+            )
+        """;
+        sql """ drop table if exists 
datetime_partition_target_tbl_${format_compression} """
+        sql """
+            CREATE TABLE datetime_partition_target_tbl_${format_compression} (
+                `id` bigint,
+                `ts` datetime
+            ) ENGINE = iceberg
+            PARTITION BY LIST (ts) ()
+            properties (
+                "compression-codec" = ${compression},
+                "write-format"=${format}
+            )
+        """;
+
+        sql """
+            INSERT INTO datetime_partition_source_tbl_${format_compression} (
+                id, ts
+            ) VALUES (1, '2025-01-01 10:00:00');
+            """
+        order_qt_datetime_partition01 """ SELECT * FROM 
datetime_partition_source_tbl_${format_compression} """
+
+        sql """
+            INSERT INTO datetime_partition_target_tbl_${format_compression} (
+                id, ts
+            ) VALUES (1, '2025-01-01 10:00:00');
+            """
+        order_qt_datetime_partition02 """ SELECT * FROM 
datetime_partition_target_tbl_${format_compression} """
+
+        sql """ drop table datetime_partition_source_tbl_${format_compression} 
"""
+        sql """ drop table datetime_partition_target_tbl_${format_compression} 
"""
+        sql """ drop database if exists `test_datetime_partition` """;
+    }
+
     def test_columns_out_of_order = {  String format_compression, String 
catalog_name ->
         def parts = format_compression.split("_")
         def format = parts[0]
@@ -218,6 +266,7 @@ suite("test_iceberg_write_partitions", 
"p0,external,iceberg,external_docker,exte
                 logger.info("Process format_compression " + format_compression)
                 q01(format_compression, iceberg_catalog_name, 
hive_catalog_name)
                 q02(format_compression, iceberg_catalog_name, 
hive_catalog_name)
+                test_datetime_partition(format_compression, 
iceberg_catalog_name)
                 test_columns_out_of_order(format_compression, 
iceberg_catalog_name)
             }
             sql """drop catalog if exists ${iceberg_catalog_name}"""


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to