This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new f8885f1d3 [lake/paimon] Resolve partition eagerly in record writer 
(#3023)
f8885f1d3 is described below

commit f8885f1d3158a47e0d22b163cfe202c2aa4d8504
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Apr 8 21:09:37 2026 +0800

    [lake/paimon] Resolve partition eagerly in record writer (#3023)
---
 .../fluss/utils/PartitionNameConverters.java       | 138 +++++++++++++++++++++
 .../org/apache/fluss/utils/PartitionUtils.java     |  51 ++++++++
 .../org/apache/fluss/utils/PartitionUtilsTest.java | 130 +++++++++++++++++++
 .../lake/paimon/tiering/PaimonLakeWriter.java      |   8 +-
 .../fluss/lake/paimon/tiering/RecordWriter.java    |  49 +++++++-
 .../paimon/tiering/append/AppendOnlyWriter.java    |  12 +-
 .../paimon/tiering/mergetree/MergeTreeWriter.java  |  12 +-
 7 files changed, 381 insertions(+), 19 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionNameConverters.java
 
b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionNameConverters.java
index e12a68d21..984cd6a0a 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionNameConverters.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionNameConverters.java
@@ -21,6 +21,7 @@ import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
 
 import java.time.Instant;
+import java.time.LocalDateTime;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
@@ -130,4 +131,141 @@ public class PartitionNameConverters {
                 .atOffset(ZoneOffset.UTC)
                 .format(TimestampFormatter);
     }
+
+    // ---- Reverse parsing methods (inverse of the above formatting methods) 
----
+
+    /** Parses a hex string back to byte array. Reverse of {@link 
#hexString(byte[])}. */
+    public static byte[] parseHexString(String hex) {
+        int len = hex.length();
+        byte[] bytes = new byte[len / 2];
+        for (int i = 0; i < len; i += 2) {
+            bytes[i / 2] =
+                    (byte)
+                            ((Character.digit(hex.charAt(i), 16) << 4)
+                                    + Character.digit(hex.charAt(i + 1), 16));
+        }
+        return bytes;
+    }
+
+    /**
+     * Parses "YYYY-MM-DD" date string back to days since epoch. Reverse of 
{@link
+     * #dayToString(int)}.
+     */
+    public static int parseDayString(String dayStr) {
+        String[] parts = dayStr.split("-");
+        int year = Integer.parseInt(parts[0]);
+        int month = Integer.parseInt(parts[1]);
+        int day = Integer.parseInt(parts[2]);
+        LocalDateTime date = LocalDateTime.of(year, month, day, 0, 0);
+        long epochDay = date.toLocalDate().toEpochDay();
+        return (int) epochDay;
+    }
+
+    /**
+     * Parses "HH-MM-SS_mmm" time string back to milliseconds of day. Reverse 
of {@link
+     * #milliToString(int)}.
+     */
+    public static int parseMilliString(String timeStr) {
+        String[] mainParts = timeStr.split("_");
+        String[] timeParts = mainParts[0].split("-");
+        int hour = Integer.parseInt(timeParts[0]);
+        int min = Integer.parseInt(timeParts[1]);
+        int sec = Integer.parseInt(timeParts[2]);
+        int millis = Integer.parseInt(mainParts[1]);
+        return hour * 3600000 + min * 60000 + sec * 1000 + millis;
+    }
+
+    /** Parses reformatted float string back to Float. Reverse of {@link 
#reformatFloat(Float)}. */
+    public static Float parseFloat(String value) {
+        if ("NaN".equals(value)) {
+            return Float.NaN;
+        } else if ("Inf".equals(value)) {
+            return Float.POSITIVE_INFINITY;
+        } else if ("-Inf".equals(value)) {
+            return Float.NEGATIVE_INFINITY;
+        } else {
+            return java.lang.Float.parseFloat(value.replace("_", "."));
+        }
+    }
+
+    /**
+     * Parses reformatted double string back to Double. Reverse of {@link 
#reformatDouble(Double)}.
+     */
+    public static Double parseDouble(String value) {
+        if ("NaN".equals(value)) {
+            return Double.NaN;
+        } else if ("Inf".equals(value)) {
+            return Double.POSITIVE_INFINITY;
+        } else if ("-Inf".equals(value)) {
+            return Double.NEGATIVE_INFINITY;
+        } else {
+            return java.lang.Double.parseDouble(value.replace("_", "."));
+        }
+    }
+
+    /**
+     * Parses timestamp string back to TimestampNtz. Reverse of {@link
+     * #timestampToString(TimestampNtz)}.
+     *
+     * <p>Format: "yyyy-MM-dd-HH-mm-ss_nnnnnnnnn" where the nano part has 0-9 
digits.
+     */
+    public static TimestampNtz parseTimestampNtz(String value) {
+        long[] millisAndNano = parseTimestampString(value);
+        return TimestampNtz.fromMillis(millisAndNano[0], (int) 
millisAndNano[1]);
+    }
+
+    /**
+     * Parses timestamp string back to TimestampLtz. Reverse of {@link
+     * #timestampToString(TimestampLtz)}.
+     */
+    public static TimestampLtz parseTimestampLtz(String value) {
+        long[] millisAndNano = parseTimestampString(value);
+        return TimestampLtz.fromEpochMillis(millisAndNano[0], (int) 
millisAndNano[1]);
+    }
+
+    /**
+     * Parses a timestamp string into [epochMillis, nanoOfMillisecond].
+     *
+     * <p>Format: "yyyy-MM-dd-HH-mm-ss_nnnnnnnnn" (the time and nano parts are 
optional).
+     */
+    private static long[] parseTimestampString(String value) {
+        int underscoreIdx = value.lastIndexOf('_');
+        String dateTimePart;
+        String nanoPart;
+        if (underscoreIdx >= 0) {
+            dateTimePart = value.substring(0, underscoreIdx);
+            nanoPart = value.substring(underscoreIdx + 1);
+        } else {
+            dateTimePart = value;
+            nanoPart = "";
+        }
+
+        String[] parts = dateTimePart.split("-");
+        int year = Integer.parseInt(parts[0]);
+        int month = Integer.parseInt(parts[1]);
+        int day = Integer.parseInt(parts[2]);
+        int hour = parts.length > 3 ? Integer.parseInt(parts[3]) : 0;
+        int min = parts.length > 4 ? Integer.parseInt(parts[4]) : 0;
+        int sec = parts.length > 5 ? Integer.parseInt(parts[5]) : 0;
+
+        LocalDateTime dateTime = LocalDateTime.of(year, month, day, hour, min, 
sec);
+        long epochMillis =
+                dateTime.toLocalDate().toEpochDay() * 86400000L
+                        + dateTime.toLocalTime().toSecondOfDay() * 1000L;
+
+        int nanoOfMillisecond = 0;
+        if (!nanoPart.isEmpty()) {
+            // Pad to 9 digits (nano of second)
+            StringBuilder sb = new StringBuilder(nanoPart);
+            while (sb.length() < 9) {
+                sb.append('0');
+            }
+            long nanoOfSecond = Long.parseLong(sb.toString());
+            // First 3 digits are millis-of-second, rest are nano-of-millis
+            epochMillis += nanoOfSecond / 1_000_000;
+            nanoOfMillisecond = (int) (nanoOfSecond % 1_000_000);
+        }
+
+        return new long[] {epochMillis, nanoOfMillisecond};
+    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
index 47df48318..6474d2ce9 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
@@ -233,6 +233,57 @@ public class PartitionUtils {
         return DateTimeFormatter.ofPattern(format).format(zonedDateTime);
     }
 
+    /**
+     * Parses a partition value string back to its typed Fluss internal 
representation. This is the
+     * reverse operation of {@link #convertValueOfType(Object, DataTypeRoot)}.
+     *
+     * @param value the string representation of the partition value
+     * @param type the data type root of the partition column
+     * @return the typed value as a Fluss internal data structure
+     */
+    public static Object parseValueOfType(String value, DataTypeRoot type) {
+        switch (type) {
+            case CHAR:
+            case STRING:
+                return BinaryString.fromString(value);
+            case BOOLEAN:
+                if ("true".equalsIgnoreCase(value)) {
+                    return true;
+                } else if ("false".equalsIgnoreCase(value)) {
+                    return false;
+                }
+                throw new IllegalArgumentException(
+                        "Invalid boolean partition value: '"
+                                + value
+                                + "'. Expected 'true' or 'false'.");
+            case BINARY:
+            case BYTES:
+                return PartitionNameConverters.parseHexString(value);
+            case TINYINT:
+                return Byte.parseByte(value);
+            case SMALLINT:
+                return Short.parseShort(value);
+            case INTEGER:
+                return Integer.parseInt(value);
+            case BIGINT:
+                return Long.parseLong(value);
+            case DATE:
+                return PartitionNameConverters.parseDayString(value);
+            case TIME_WITHOUT_TIME_ZONE:
+                return PartitionNameConverters.parseMilliString(value);
+            case FLOAT:
+                return PartitionNameConverters.parseFloat(value);
+            case DOUBLE:
+                return PartitionNameConverters.parseDouble(value);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return PartitionNameConverters.parseTimestampNtz(value);
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return PartitionNameConverters.parseTimestampLtz(value);
+            default:
+                throw new IllegalArgumentException("Unsupported DataTypeRoot: 
" + type);
+        }
+    }
+
     public static String convertValueOfType(Object value, DataTypeRoot type) {
         String stringPartitionKey = "";
         switch (type) {
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java 
b/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java
index dae176265..aeb2c1d7c 100644
--- a/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java
@@ -43,6 +43,7 @@ import static 
org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
 import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
 import static org.apache.fluss.utils.PartitionUtils.convertValueOfType;
 import static org.apache.fluss.utils.PartitionUtils.generateAutoPartition;
+import static org.apache.fluss.utils.PartitionUtils.parseValueOfType;
 import static org.apache.fluss.utils.PartitionUtils.validatePartitionSpec;
 import static org.apache.fluss.utils.PartitionUtils.validatePartitionValues;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -424,4 +425,133 @@ class PartitionUtilsTest {
         String detectInvalid = detectInvalidName(toStringResult);
         assertThat(detectInvalid).isEqualTo(null);
     }
+
+    // ---- Round-trip tests for parseValueOfType ----
+
+    @Test
+    void testRoundTripString() {
+        assertRoundTrip(BinaryString.fromString("Fluss"), DataTypeRoot.STRING);
+    }
+
+    @Test
+    void testRoundTripChar() {
+        assertRoundTrip(BinaryString.fromString("F"), DataTypeRoot.CHAR);
+    }
+
+    @Test
+    void testRoundTripBoolean() {
+        assertRoundTrip(true, DataTypeRoot.BOOLEAN);
+        assertRoundTrip(false, DataTypeRoot.BOOLEAN);
+    }
+
+    @Test
+    void testRoundTripBytes() {
+        byte[] value = new byte[] {0x10, 0x20, 0x30, 0x40, 0x50, (byte) 0xFF};
+        String str = convertValueOfType(value, DataTypeRoot.BYTES);
+        byte[] parsed = (byte[]) parseValueOfType(str, DataTypeRoot.BYTES);
+        assertThat(parsed).isEqualTo(value);
+    }
+
+    @Test
+    void testRoundTripBinary() {
+        byte[] value = new byte[] {0x10, 0x20, 0x30, 0x40, 0x50, (byte) 0xFF};
+        String str = convertValueOfType(value, DataTypeRoot.BINARY);
+        byte[] parsed = (byte[]) parseValueOfType(str, DataTypeRoot.BINARY);
+        assertThat(parsed).isEqualTo(value);
+    }
+
+    @Test
+    void testRoundTripTinyInt() {
+        assertRoundTrip((byte) 100, DataTypeRoot.TINYINT);
+        assertRoundTrip((byte) -100, DataTypeRoot.TINYINT);
+    }
+
+    @Test
+    void testRoundTripSmallInt() {
+        assertRoundTrip((short) -32760, DataTypeRoot.SMALLINT);
+    }
+
+    @Test
+    void testRoundTripInteger() {
+        assertRoundTrip(299000, DataTypeRoot.INTEGER);
+        assertRoundTrip(-299000, DataTypeRoot.INTEGER);
+    }
+
+    @Test
+    void testRoundTripBigInt() {
+        assertRoundTrip(1748662955428L, DataTypeRoot.BIGINT);
+        assertRoundTrip(-1748662955428L, DataTypeRoot.BIGINT);
+    }
+
+    @Test
+    void testRoundTripDate() {
+        assertRoundTrip(20235, DataTypeRoot.DATE);
+        assertRoundTrip(0, DataTypeRoot.DATE);
+    }
+
+    @Test
+    void testRoundTripTime() {
+        assertRoundTrip(5402199, DataTypeRoot.TIME_WITHOUT_TIME_ZONE);
+        assertRoundTrip(0, DataTypeRoot.TIME_WITHOUT_TIME_ZONE);
+    }
+
+    @Test
+    void testRoundTripFloat() {
+        assertRoundTrip(5.73f, DataTypeRoot.FLOAT);
+        assertRoundTrip(Float.NaN, DataTypeRoot.FLOAT);
+        assertRoundTrip(Float.POSITIVE_INFINITY, DataTypeRoot.FLOAT);
+        assertRoundTrip(Float.NEGATIVE_INFINITY, DataTypeRoot.FLOAT);
+    }
+
+    @Test
+    void testRoundTripDouble() {
+        assertRoundTrip(5.73, DataTypeRoot.DOUBLE);
+        assertRoundTrip(Double.NaN, DataTypeRoot.DOUBLE);
+        assertRoundTrip(Double.POSITIVE_INFINITY, DataTypeRoot.DOUBLE);
+        assertRoundTrip(Double.NEGATIVE_INFINITY, DataTypeRoot.DOUBLE);
+    }
+
+    @Test
+    void testRoundTripTimestampNtz() {
+        // With nanos
+        TimestampNtz ts1 = TimestampNtz.fromMillis(1748662955428L, 99988);
+        assertRoundTrip(ts1, DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+
+        // Zero nanos
+        TimestampNtz ts2 = TimestampNtz.fromMillis(1748662955428L, 0);
+        assertRoundTrip(ts2, DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+
+        // Zero millis-of-second with nanos
+        TimestampNtz ts3 = TimestampNtz.fromMillis(1748662955000L, 99988);
+        assertRoundTrip(ts3, DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+
+        // Zero millis-of-second and zero nanos
+        TimestampNtz ts4 = TimestampNtz.fromMillis(1748662955000L, 0);
+        assertRoundTrip(ts4, DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+
+        // Negative millis
+        TimestampNtz ts5 = TimestampNtz.fromMillis(-1748662955428L, 99988);
+        assertRoundTrip(ts5, DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+    }
+
+    @Test
+    void testRoundTripTimestampLtz() {
+        // With nanos
+        TimestampLtz ts1 = TimestampLtz.fromEpochMillis(1748662955428L, 99988);
+        assertRoundTrip(ts1, DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+
+        // Zero nanos
+        TimestampLtz ts2 = TimestampLtz.fromEpochMillis(1748662955428L, 0);
+        assertRoundTrip(ts2, DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+
+        // Negative millis
+        TimestampLtz ts3 = TimestampLtz.fromEpochMillis(-1748662955428L, 
99988);
+        assertRoundTrip(ts3, DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+    }
+
+    private void assertRoundTrip(Object originalValue, DataTypeRoot type) {
+        String str = convertValueOfType(originalValue, type);
+        Object parsed = parseValueOfType(str, type);
+        assertThat(parsed).isEqualTo(originalValue);
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java
index 8472b825b..8ac7aabe4 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java
@@ -23,6 +23,7 @@ import org.apache.fluss.lake.writer.LakeWriter;
 import org.apache.fluss.lake.writer.WriterInitContext;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.types.RowType;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
@@ -52,6 +53,7 @@ public class PaimonLakeWriter implements 
LakeWriter<PaimonWriteResult> {
                         
writerInitContext.tableInfo().getTableConfig().isDataLakeAutoCompaction());
 
         List<String> partitionKeys = fileStoreTable.partitionKeys();
+        RowType flussRowType = writerInitContext.tableInfo().getRowType();
 
         this.recordWriter =
                 fileStoreTable.primaryKeys().isEmpty()
@@ -59,12 +61,14 @@ public class PaimonLakeWriter implements 
LakeWriter<PaimonWriteResult> {
                                 fileStoreTable,
                                 writerInitContext.tableBucket(),
                                 writerInitContext.partition(),
-                                partitionKeys)
+                                partitionKeys,
+                                flussRowType)
                         : new MergeTreeWriter(
                                 fileStoreTable,
                                 writerInitContext.tableBucket(),
                                 writerInitContext.partition(),
-                                partitionKeys);
+                                partitionKeys,
+                                flussRowType);
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
index 413b14189..bf509bc32 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
@@ -17,8 +17,13 @@
 
 package org.apache.fluss.lake.paimon.tiering;
 
+import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
+import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.utils.PartitionUtils;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.table.sink.CommitMessage;
@@ -38,7 +43,7 @@ public abstract class RecordWriter<T> implements 
AutoCloseable {
     protected final RowType tableRowType;
     protected final int bucket;
     protected final List<String> partitionKeys;
-    @Nullable protected BinaryRow partition;
+    protected final BinaryRow partition;
     protected final FlussRecordAsPaimonRow flussRecordAsPaimonRow;
 
     public RecordWriter(
@@ -46,14 +51,18 @@ public abstract class RecordWriter<T> implements 
AutoCloseable {
             RowType tableRowType,
             TableBucket tableBucket,
             @Nullable String partition,
-            List<String> partitionKeys) {
+            List<String> partitionKeys,
+            org.apache.fluss.types.RowType flussRowType) {
         this.tableWrite = tableWrite;
         this.tableRowType = tableRowType;
         this.bucket = tableBucket.getBucket();
         this.partitionKeys = partitionKeys;
-        // set partition to EMPTY_ROW in advance for non-partitioned table
         if (partition == null || partitionKeys.isEmpty()) {
+            // non-partitioned table
             this.partition = BinaryRow.EMPTY_ROW;
+        } else {
+            // eagerly resolve BinaryRow partition from partition name string
+            this.partition = resolvePartition(partition, partitionKeys, 
flussRowType);
         }
         this.flussRecordAsPaimonRow =
                 new FlussRecordAsPaimonRow(tableBucket.getBucket(), 
tableRowType);
@@ -73,4 +82,38 @@ public abstract class RecordWriter<T> implements 
AutoCloseable {
     public void close() throws Exception {
         tableWrite.close();
     }
+
+    /**
+     * Resolves a Paimon {@link BinaryRow} partition from the partition name 
string by parsing each
+     * partition value to its typed Fluss representation, constructing a 
synthetic row, and
+     * delegating to Paimon's partition extraction.
+     */
+    private BinaryRow resolvePartition(
+            String partitionName,
+            List<String> partitionKeys,
+            org.apache.fluss.types.RowType flussRowType) {
+        ResolvedPartitionSpec spec =
+                ResolvedPartitionSpec.fromPartitionName(partitionKeys, 
partitionName);
+        List<String> partitionValues = spec.getPartitionValues();
+
+        // Build a GenericRow with partition column values at their correct 
positions.
+        // The row field count must match the Paimon RowType (business columns 
+ system columns)
+        // so that FlussRowAsPaimonRow aligns with the Paimon schema.
+        GenericRow partitionRow = new GenericRow(tableRowType.getFieldCount());
+
+        for (int i = 0; i < partitionKeys.size(); i++) {
+            String partitionKey = partitionKeys.get(i);
+            int fieldIndex = flussRowType.getFieldIndex(partitionKey);
+            checkState(
+                    fieldIndex >= 0,
+                    "Partition key '%s' not found in Fluss row type.",
+                    partitionKey);
+            DataTypeRoot typeRoot = 
flussRowType.getTypeAt(fieldIndex).getTypeRoot();
+            Object typedValue = 
PartitionUtils.parseValueOfType(partitionValues.get(i), typeRoot);
+            partitionRow.setField(fieldIndex, typedValue);
+        }
+
+        FlussRowAsPaimonRow paimonRow = new FlussRowAsPaimonRow(partitionRow, 
tableRowType);
+        return tableWrite.getPartition(paimonRow);
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
index 7daec20b7..a0966dfd4 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
@@ -20,6 +20,7 @@ package org.apache.fluss.lake.paimon.tiering.append;
 import org.apache.fluss.lake.paimon.tiering.RecordWriter;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.types.RowType;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.table.BucketMode;
@@ -41,7 +42,8 @@ public class AppendOnlyWriter extends 
RecordWriter<InternalRow> {
             FileStoreTable fileStoreTable,
             TableBucket tableBucket,
             @Nullable String partition,
-            List<String> partitionKeys) {
+            List<String> partitionKeys,
+            RowType flussRowType) {
         //noinspection unchecked
         super(
                 (TableWriteImpl<InternalRow>)
@@ -50,7 +52,8 @@ public class AppendOnlyWriter extends 
RecordWriter<InternalRow> {
                 fileStoreTable.rowType(),
                 tableBucket,
                 partition,
-                partitionKeys); // Pass to parent
+                partitionKeys,
+                flussRowType);
         this.fileStoreTable = fileStoreTable;
     }
 
@@ -58,11 +61,6 @@ public class AppendOnlyWriter extends 
RecordWriter<InternalRow> {
     public void write(LogRecord record) throws Exception {
         flussRecordAsPaimonRow.setFlussRecord(record);
 
-        // get partition once
-        if (partition == null) {
-            partition = tableWrite.getPartition(flussRecordAsPaimonRow);
-        }
-
         // hacky, call internal method tableWrite.getWrite() to support
         // to write to given partition, otherwise, it'll always extract a 
partition from Paimon row
         // which may be costly
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
index 95a527518..0778d73d0 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
@@ -20,6 +20,7 @@ package org.apache.fluss.lake.paimon.tiering.mergetree;
 import org.apache.fluss.lake.paimon.tiering.RecordWriter;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.types.RowType;
 
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.disk.IOManager;
@@ -49,13 +50,15 @@ public class MergeTreeWriter extends RecordWriter<KeyValue> 
{
             FileStoreTable fileStoreTable,
             TableBucket tableBucket,
             @Nullable String partition,
-            List<String> partitionKeys) {
+            List<String> partitionKeys,
+            RowType flussRowType) {
         super(
                 createTableWrite(fileStoreTable),
                 fileStoreTable.rowType(),
                 tableBucket,
                 partition,
-                partitionKeys);
+                partitionKeys,
+                flussRowType);
         this.rowKeyExtractor = fileStoreTable.createRowKeyExtractor();
     }
 
@@ -78,11 +81,6 @@ public class MergeTreeWriter extends RecordWriter<KeyValue> {
     public void write(LogRecord record) throws Exception {
         flussRecordAsPaimonRow.setFlussRecord(record);
 
-        // get partition once
-        if (partition == null) {
-            partition = tableWrite.getPartition(flussRecordAsPaimonRow);
-        }
-
         rowKeyExtractor.setRecord(flussRecordAsPaimonRow);
         keyValue.replace(
                 rowKeyExtractor.trimmedPrimaryKey(),

Reply via email to