This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 35741bfc6da branch-3.1: [fix](iceberg) Fix partition value conversion
for Iceberg data file generation #58603 (#58907)
35741bfc6da is described below
commit 35741bfc6daf86936db21b6a5043dafed1041887
Author: Socrates <[email protected]>
AuthorDate: Mon Dec 15 11:18:02 2025 +0800
branch-3.1: [fix](iceberg) Fix partition value conversion for Iceberg data
file generation #58603 (#58907)
bp: #58603
---
.../doris/datasource/iceberg/IcebergUtils.java | 93 ++++++++++++++++++++++
.../iceberg/helper/IcebergWriterHelper.java | 54 +++++++++++--
.../datasource/iceberg/IcebergTransactionTest.java | 4 +
.../write/test_iceberg_write_partitions.out | 25 +++++-
.../write/test_iceberg_write_partitions.groovy | 49 ++++++++++++
5 files changed, 219 insertions(+), 6 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index 698469db93b..268b73642e3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -62,6 +62,8 @@ import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.property.metastore.HMSBaseProperties;
import org.apache.doris.nereids.exceptions.NotSupportedException;
+import org.apache.doris.nereids.trees.expressions.literal.Result;
+import org.apache.doris.nereids.util.DateUtils;
import org.apache.doris.thrift.TExprOpcode;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -124,6 +126,8 @@ import java.time.Month;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoField;
+import java.time.temporal.TemporalAccessor;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
@@ -801,6 +805,95 @@ public class IcebergUtils {
}
}
+ /**
+ * Convert human-readable partition value string to appropriate Java type
for
+ * Iceberg expression.
+ * This is used for static partition overwrite where user specifies
partition
+ * values like PARTITION (dt='2025-01-01', region='bj').
+ *
+ * @param valueStr Partition value as human-readable string (e.g.,
+ * "2025-01-01" for date)
+ * @param icebergType Iceberg type of the partition field
+ * @return Converted value object suitable for Iceberg Expression, or null
if
+ * value is null
+ */
+ public static Object parsePartitionValueFromString(String valueStr,
org.apache.iceberg.types.Type icebergType) {
+ if (valueStr == null) {
+ return null;
+ }
+
+ try {
+ switch (icebergType.typeId()) {
+ case STRING:
+ return valueStr;
+ case INTEGER:
+ return Integer.parseInt(valueStr);
+ case LONG:
+ return Long.parseLong(valueStr);
+ case FLOAT:
+ return Float.parseFloat(valueStr);
+ case DOUBLE:
+ return Double.parseDouble(valueStr);
+ case BOOLEAN:
+ return Boolean.parseBoolean(valueStr);
+ case DATE:
+ // Parse date string (format: yyyy-MM-dd) to epoch day
+ return (int) LocalDate.parse(valueStr,
DateTimeFormatter.ISO_LOCAL_DATE).toEpochDay();
+ case TIMESTAMP:
+ return parseTimestampToMicros(valueStr, (TimestampType)
icebergType);
+ case DECIMAL:
+ return new BigDecimal(valueStr);
+ default:
+ throw new IllegalArgumentException("Unsupported partition
value type: " + icebergType);
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Failed to
convert partition value '%s' to type %s",
+ valueStr, icebergType), e);
+ }
+ }
+
+ /**
+ * Parse timestamp string to microseconds using Doris's built-in datetime
+ * parser.
+ *
+ * @param valueStr Timestamp string in various formats
+ * @param timestampType Iceberg timestamp type (with or without timezone)
+ * @return Microseconds since epoch
+ * @throws IllegalArgumentException if the timestamp string cannot be
parsed
+ */
+ private static long parseTimestampToMicros(String valueStr, TimestampType
timestampType) {
+ // Use Doris's built-in DateLiteral.parseDateTime() which supports
multiple formats
+ Result<TemporalAccessor, ? extends Exception> parseResult =
+
org.apache.doris.nereids.trees.expressions.literal.DateLiteral.parseDateTime(valueStr);
+
+ if (parseResult.isError()) {
+ throw new IllegalArgumentException(
+ String.format("Failed to parse timestamp string '%s'",
valueStr));
+ }
+
+ TemporalAccessor temporal = parseResult.get();
+
+ // Build LocalDateTime from TemporalAccessor using DateUtils helper
methods
+ LocalDateTime ldt = LocalDateTime.of(
+ DateUtils.getOrDefault(temporal, ChronoField.YEAR),
+ DateUtils.getOrDefault(temporal, ChronoField.MONTH_OF_YEAR),
+ DateUtils.getOrDefault(temporal, ChronoField.DAY_OF_MONTH),
+ DateUtils.getHourOrDefault(temporal),
+ DateUtils.getOrDefault(temporal, ChronoField.MINUTE_OF_HOUR),
+ DateUtils.getOrDefault(temporal, ChronoField.SECOND_OF_MINUTE),
+ DateUtils.getOrDefault(temporal, ChronoField.NANO_OF_SECOND));
+
+ // Convert to microseconds
+ ZoneId zone = timestampType.shouldAdjustToUTC()
+ ? DateUtils.getTimeZone()
+ : ZoneId.of("UTC");
+
+ long epochSecond = ldt.atZone(zone).toInstant().getEpochSecond();
+ long microSecond = DateUtils.getOrDefault(temporal,
ChronoField.NANO_OF_SECOND) / 1_000L;
+
+ return epochSecond * 1_000_000L + microSecond;
+ }
+
private static void updateIcebergColumnUniqueId(Column column,
Types.NestedField icebergField) {
column.setUniqueId(icebergField.fieldId());
List<NestedField> icebergFields = Lists.newArrayList();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
index 4171a0536f9..d429e95e8ce 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
@@ -17,6 +17,7 @@
package org.apache.doris.datasource.iceberg.helper;
+import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.statistics.CommonStatistics;
import org.apache.doris.thrift.TIcebergCommitData;
@@ -24,8 +25,10 @@ import com.google.common.base.VerifyException;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.types.Types;
import java.util.ArrayList;
import java.util.List;
@@ -51,7 +54,7 @@ public class IcebergWriterHelper {
long recordCount = commitData.getRowCount();
CommonStatistics stat = new CommonStatistics(recordCount,
DEFAULT_FILE_COUNT, fileSize);
- Optional<List<String>> partValues = Optional.empty();
+ Optional<PartitionData> partitionData = Optional.empty();
//get and check partitionValues when table is partitionedTable
if (spec.isPartitioned()) {
List<String> partitionValues = commitData.getPartitionValues();
@@ -60,9 +63,11 @@ public class IcebergWriterHelper {
}
partitionValues = partitionValues.stream().map(s ->
s.equals("null") ? null : s)
.collect(Collectors.toList());
- partValues = Optional.of(partitionValues);
+
+ // Convert human-readable partition values to PartitionData
+ partitionData =
Optional.of(convertToPartitionData(partitionValues, spec));
}
- DataFile dataFile = genDataFile(format, location, spec,
partValues, stat);
+ DataFile dataFile = genDataFile(format, location, spec,
partitionData, stat);
dataFiles.add(dataFile);
}
return WriteResult.builder()
@@ -75,7 +80,7 @@ public class IcebergWriterHelper {
FileFormat format,
String location,
PartitionSpec spec,
- Optional<List<String>> partValues,
+ Optional<PartitionData> partitionData,
CommonStatistics statistics) {
DataFiles.Builder builder = DataFiles.builder(spec)
@@ -84,8 +89,47 @@ public class IcebergWriterHelper {
.withRecordCount(statistics.getRowCount())
.withFormat(format);
- partValues.ifPresent(builder::withPartitionValues);
+ partitionData.ifPresent(builder::withPartition);
return builder.build();
}
+
+ /**
+ * Convert human-readable partition values (from Backend) to PartitionData.
+ *
+ * Backend sends partition values as human-readable strings:
+ * - DATE: "2025-01-25"
+ * - DATETIME: "2025-01-25 10:00:00"
+ */
+ private static PartitionData convertToPartitionData(
+ List<String> humanReadableValues, PartitionSpec spec) {
+ // Create PartitionData instance using the partition type from spec
+ PartitionData partitionData = new PartitionData(spec.partitionType());
+
+ // Get partition type fields to determine the result type of each
partition field
+ Types.StructType partitionType = spec.partitionType();
+ List<Types.NestedField> partitionTypeFields = partitionType.fields();
+
+ for (int i = 0; i < humanReadableValues.size(); i++) {
+ String humanReadableValue = humanReadableValues.get(i);
+
+ if (humanReadableValue == null) {
+ partitionData.set(i, null);
+ continue;
+ }
+
+ // Get the partition field's result type
+ Types.NestedField partitionTypeField = partitionTypeFields.get(i);
+ org.apache.iceberg.types.Type partitionFieldType =
partitionTypeField.type();
+
+ // Convert the human-readable value to internal format object
+ Object internalValue = IcebergUtils.parsePartitionValueFromString(
+ humanReadableValue, partitionFieldType);
+
+ // Set the value in PartitionData
+ partitionData.set(i, internalValue);
+ }
+
+ return partitionData;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
index d4d7419a556..1a6e5ee9890 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
@@ -190,6 +190,10 @@ public class IcebergTransactionTest {
try (MockedStatic<IcebergUtils> mockedStatic =
Mockito.mockStatic(IcebergUtils.class)) {
mockedStatic.when(() ->
IcebergUtils.getIcebergTable(ArgumentMatchers.any(ExternalTable.class)))
.thenReturn(table);
+ // Allow parsePartitionValueFromString to call the real
implementation
+ mockedStatic.when(() -> IcebergUtils.parsePartitionValueFromString(
+ ArgumentMatchers.any(), ArgumentMatchers.any()))
+ .thenCallRealMethod();
IcebergTransaction txn = getTxn();
txn.updateIcebergCommitData(ctdList);
txn.beginInsert(icebergExternalTable, Optional.empty());
diff --git
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_partitions.out
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_partitions.out
index c101079be33..d5320e4ac5f 100644
---
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_partitions.out
+++
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_partitions.out
@@ -35,6 +35,12 @@ true 3 2147483647 9223372036854775807 123.45
123456.789
123456.789012 3 \N 2024-03-20
123456.789012 3 string_value 2024-03-20
+-- !datetime_partition01 --
+1 2025-01-01T10:00
+
+-- !datetime_partition02 --
+1 2025-01-01T10:00
+
-- !columns_out_of_order01 --
3 6 1 4 2 5
@@ -77,6 +83,12 @@ true 3 2147483647 9223372036854775807 123.45
123456.789
123456.789012 3 \N 2024-03-20
123456.789012 3 string_value 2024-03-20
+-- !datetime_partition01 --
+1 2025-01-01T10:00
+
+-- !datetime_partition02 --
+1 2025-01-01T10:00
+
-- !columns_out_of_order01 --
3 6 1 4 2 5
@@ -119,6 +131,12 @@ true 3 2147483647 9223372036854775807
123.45 123456.789
123456.789012 3 \N 2024-03-20
123456.789012 3 string_value 2024-03-20
+-- !datetime_partition01 --
+1 2025-01-01T10:00
+
+-- !datetime_partition02 --
+1 2025-01-01T10:00
+
-- !columns_out_of_order01 --
3 6 1 4 2 5
@@ -161,9 +179,14 @@ true 3 2147483647 9223372036854775807
123.45 123456.789
123456.789012 3 \N 2024-03-20
123456.789012 3 string_value 2024-03-20
+-- !datetime_partition01 --
+1 2025-01-01T10:00
+
+-- !datetime_partition02 --
+1 2025-01-01T10:00
+
-- !columns_out_of_order01 --
3 6 1 4 2 5
-- !columns_out_of_order02 --
1 2 3 4 5 6
-
diff --git
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_partitions.groovy
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_partitions.groovy
index aa8589eddf3..94b04a164b1 100644
---
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_partitions.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_partitions.groovy
@@ -120,6 +120,54 @@ suite("test_iceberg_write_partitions",
"p0,external,iceberg,external_docker,exte
sql """ DROP TABLE iceberg_all_partition_types2_${format_compression};
"""
}
+ def test_datetime_partition = { String format_compression, String
catalog_name ->
+ def parts = format_compression.split("_")
+ def format = parts[0]
+ def compression = parts[1]
+ sql """ drop table if exists
datetime_partition_source_tbl_${format_compression} """
+ sql """
+ CREATE TABLE datetime_partition_source_tbl_${format_compression} (
+ `id` bigint,
+ `ts` datetime
+ ) ENGINE = iceberg
+ PARTITION BY LIST (ts) ()
+ properties (
+ "compression-codec" = ${compression},
+ "write-format"=${format}
+ )
+ """;
+ sql """ drop table if exists
datetime_partition_target_tbl_${format_compression} """
+ sql """
+ CREATE TABLE datetime_partition_target_tbl_${format_compression} (
+ `id` bigint,
+ `ts` datetime
+ ) ENGINE = iceberg
+ PARTITION BY LIST (ts) ()
+ properties (
+ "compression-codec" = ${compression},
+ "write-format"=${format}
+ )
+ """;
+
+ sql """
+ INSERT INTO datetime_partition_source_tbl_${format_compression} (
+ id, ts
+ ) VALUES (1, '2025-01-01 10:00:00');
+ """
+ order_qt_datetime_partition01 """ SELECT * FROM
datetime_partition_source_tbl_${format_compression} """
+
+ sql """
+ INSERT INTO datetime_partition_target_tbl_${format_compression} (
+ id, ts
+ ) VALUES (1, '2025-01-01 10:00:00');
+ """
+ order_qt_datetime_partition02 """ SELECT * FROM
datetime_partition_target_tbl_${format_compression} """
+
+ sql """ drop table datetime_partition_source_tbl_${format_compression}
"""
+ sql """ drop table datetime_partition_target_tbl_${format_compression}
"""
+ sql """ drop database if exists `test_datetime_partition` """;
+ }
+
def test_columns_out_of_order = { String format_compression, String
catalog_name ->
def parts = format_compression.split("_")
def format = parts[0]
@@ -218,6 +266,7 @@ suite("test_iceberg_write_partitions",
"p0,external,iceberg,external_docker,exte
logger.info("Process format_compression " + format_compression)
q01(format_compression, iceberg_catalog_name,
hive_catalog_name)
q02(format_compression, iceberg_catalog_name,
hive_catalog_name)
+ test_datetime_partition(format_compression,
iceberg_catalog_name)
test_columns_out_of_order(format_compression,
iceberg_catalog_name)
}
sql """drop catalog if exists ${iceberg_catalog_name}"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]