This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new f8885f1d3 [lake/paimon] Resolve partition eagerly in record writer
(#3023)
f8885f1d3 is described below
commit f8885f1d3158a47e0d22b163cfe202c2aa4d8504
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Apr 8 21:09:37 2026 +0800
[lake/paimon] Resolve partition eagerly in record writer (#3023)
---
.../fluss/utils/PartitionNameConverters.java | 138 +++++++++++++++++++++
.../org/apache/fluss/utils/PartitionUtils.java | 51 ++++++++
.../org/apache/fluss/utils/PartitionUtilsTest.java | 130 +++++++++++++++++++
.../lake/paimon/tiering/PaimonLakeWriter.java | 8 +-
.../fluss/lake/paimon/tiering/RecordWriter.java | 49 +++++++-
.../paimon/tiering/append/AppendOnlyWriter.java | 12 +-
.../paimon/tiering/mergetree/MergeTreeWriter.java | 12 +-
7 files changed, 381 insertions(+), 19 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionNameConverters.java
b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionNameConverters.java
index e12a68d21..984cd6a0a 100644
---
a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionNameConverters.java
+++
b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionNameConverters.java
@@ -21,6 +21,7 @@ import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
import java.time.Instant;
+import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
@@ -130,4 +131,141 @@ public class PartitionNameConverters {
.atOffset(ZoneOffset.UTC)
.format(TimestampFormatter);
}
+
+ // ---- Reverse parsing methods (inverse of the above formatting methods)
----
+
+ /** Parses a hex string back to byte array. Reverse of {@link
#hexString(byte[])}. */
+ public static byte[] parseHexString(String hex) {
+ int len = hex.length();
+ byte[] bytes = new byte[len / 2];
+ for (int i = 0; i < len; i += 2) {
+ bytes[i / 2] =
+ (byte)
+ ((Character.digit(hex.charAt(i), 16) << 4)
+ + Character.digit(hex.charAt(i + 1), 16));
+ }
+ return bytes;
+ }
+
+ /**
+ * Parses "YYYY-MM-DD" date string back to days since epoch. Reverse of
{@link
+ * #dayToString(int)}.
+ */
+ public static int parseDayString(String dayStr) {
+ String[] parts = dayStr.split("-");
+ int year = Integer.parseInt(parts[0]);
+ int month = Integer.parseInt(parts[1]);
+ int day = Integer.parseInt(parts[2]);
+ LocalDateTime date = LocalDateTime.of(year, month, day, 0, 0);
+ long epochDay = date.toLocalDate().toEpochDay();
+ return (int) epochDay;
+ }
+
+ /**
+ * Parses "HH-MM-SS_mmm" time string back to milliseconds of day. Reverse
of {@link
+ * #milliToString(int)}.
+ */
+ public static int parseMilliString(String timeStr) {
+ String[] mainParts = timeStr.split("_");
+ String[] timeParts = mainParts[0].split("-");
+ int hour = Integer.parseInt(timeParts[0]);
+ int min = Integer.parseInt(timeParts[1]);
+ int sec = Integer.parseInt(timeParts[2]);
+ int millis = Integer.parseInt(mainParts[1]);
+ return hour * 3600000 + min * 60000 + sec * 1000 + millis;
+ }
+
+ /** Parses reformatted float string back to Float. Reverse of {@link
#reformatFloat(Float)}. */
+ public static Float parseFloat(String value) {
+ if ("NaN".equals(value)) {
+ return Float.NaN;
+ } else if ("Inf".equals(value)) {
+ return Float.POSITIVE_INFINITY;
+ } else if ("-Inf".equals(value)) {
+ return Float.NEGATIVE_INFINITY;
+ } else {
+ return java.lang.Float.parseFloat(value.replace("_", "."));
+ }
+ }
+
+ /**
+ * Parses reformatted double string back to Double. Reverse of {@link
#reformatDouble(Double)}.
+ */
+ public static Double parseDouble(String value) {
+ if ("NaN".equals(value)) {
+ return Double.NaN;
+ } else if ("Inf".equals(value)) {
+ return Double.POSITIVE_INFINITY;
+ } else if ("-Inf".equals(value)) {
+ return Double.NEGATIVE_INFINITY;
+ } else {
+ return java.lang.Double.parseDouble(value.replace("_", "."));
+ }
+ }
+
+ /**
+ * Parses timestamp string back to TimestampNtz. Reverse of {@link
+ * #timestampToString(TimestampNtz)}.
+ *
+ * <p>Format: "yyyy-MM-dd-HH-mm-ss_nnnnnnnnn" where the nano part has 0-9
digits.
+ */
+ public static TimestampNtz parseTimestampNtz(String value) {
+ long[] millisAndNano = parseTimestampString(value);
+ return TimestampNtz.fromMillis(millisAndNano[0], (int)
millisAndNano[1]);
+ }
+
+ /**
+ * Parses timestamp string back to TimestampLtz. Reverse of {@link
+ * #timestampToString(TimestampLtz)}.
+ */
+ public static TimestampLtz parseTimestampLtz(String value) {
+ long[] millisAndNano = parseTimestampString(value);
+ return TimestampLtz.fromEpochMillis(millisAndNano[0], (int)
millisAndNano[1]);
+ }
+
+ /**
+ * Parses a timestamp string into [epochMillis, nanoOfMillisecond].
+ *
+ * <p>Format: "yyyy-MM-dd-HH-mm-ss_nnnnnnnnn" (the time and nano parts are
optional).
+ */
+ private static long[] parseTimestampString(String value) {
+ int underscoreIdx = value.lastIndexOf('_');
+ String dateTimePart;
+ String nanoPart;
+ if (underscoreIdx >= 0) {
+ dateTimePart = value.substring(0, underscoreIdx);
+ nanoPart = value.substring(underscoreIdx + 1);
+ } else {
+ dateTimePart = value;
+ nanoPart = "";
+ }
+
+ String[] parts = dateTimePart.split("-");
+ int year = Integer.parseInt(parts[0]);
+ int month = Integer.parseInt(parts[1]);
+ int day = Integer.parseInt(parts[2]);
+ int hour = parts.length > 3 ? Integer.parseInt(parts[3]) : 0;
+ int min = parts.length > 4 ? Integer.parseInt(parts[4]) : 0;
+ int sec = parts.length > 5 ? Integer.parseInt(parts[5]) : 0;
+
+ LocalDateTime dateTime = LocalDateTime.of(year, month, day, hour, min,
sec);
+ long epochMillis =
+ dateTime.toLocalDate().toEpochDay() * 86400000L
+ + dateTime.toLocalTime().toSecondOfDay() * 1000L;
+
+ int nanoOfMillisecond = 0;
+ if (!nanoPart.isEmpty()) {
+ // Pad to 9 digits (nano of second)
+ StringBuilder sb = new StringBuilder(nanoPart);
+ while (sb.length() < 9) {
+ sb.append('0');
+ }
+ long nanoOfSecond = Long.parseLong(sb.toString());
+ // First 3 digits are millis-of-second, rest are nano-of-millis
+ epochMillis += nanoOfSecond / 1_000_000;
+ nanoOfMillisecond = (int) (nanoOfSecond % 1_000_000);
+ }
+
+ return new long[] {epochMillis, nanoOfMillisecond};
+ }
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
index 47df48318..6474d2ce9 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
@@ -233,6 +233,57 @@ public class PartitionUtils {
return DateTimeFormatter.ofPattern(format).format(zonedDateTime);
}
+ /**
+ * Parses a partition value string back to its typed Fluss internal
representation. This is the
+ * reverse operation of {@link #convertValueOfType(Object, DataTypeRoot)}.
+ *
+ * @param value the string representation of the partition value
+ * @param type the data type root of the partition column
+ * @return the typed value as a Fluss internal data structure
+ */
+ public static Object parseValueOfType(String value, DataTypeRoot type) {
+ switch (type) {
+ case CHAR:
+ case STRING:
+ return BinaryString.fromString(value);
+ case BOOLEAN:
+ if ("true".equalsIgnoreCase(value)) {
+ return true;
+ } else if ("false".equalsIgnoreCase(value)) {
+ return false;
+ }
+ throw new IllegalArgumentException(
+ "Invalid boolean partition value: '"
+ + value
+ + "'. Expected 'true' or 'false'.");
+ case BINARY:
+ case BYTES:
+ return PartitionNameConverters.parseHexString(value);
+ case TINYINT:
+ return Byte.parseByte(value);
+ case SMALLINT:
+ return Short.parseShort(value);
+ case INTEGER:
+ return Integer.parseInt(value);
+ case BIGINT:
+ return Long.parseLong(value);
+ case DATE:
+ return PartitionNameConverters.parseDayString(value);
+ case TIME_WITHOUT_TIME_ZONE:
+ return PartitionNameConverters.parseMilliString(value);
+ case FLOAT:
+ return PartitionNameConverters.parseFloat(value);
+ case DOUBLE:
+ return PartitionNameConverters.parseDouble(value);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return PartitionNameConverters.parseTimestampNtz(value);
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return PartitionNameConverters.parseTimestampLtz(value);
+ default:
+ throw new IllegalArgumentException("Unsupported DataTypeRoot:
" + type);
+ }
+ }
+
public static String convertValueOfType(Object value, DataTypeRoot type) {
String stringPartitionKey = "";
switch (type) {
diff --git
a/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java
b/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java
index dae176265..aeb2c1d7c 100644
--- a/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java
@@ -43,6 +43,7 @@ import static
org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
import static org.apache.fluss.utils.PartitionUtils.convertValueOfType;
import static org.apache.fluss.utils.PartitionUtils.generateAutoPartition;
+import static org.apache.fluss.utils.PartitionUtils.parseValueOfType;
import static org.apache.fluss.utils.PartitionUtils.validatePartitionSpec;
import static org.apache.fluss.utils.PartitionUtils.validatePartitionValues;
import static org.assertj.core.api.Assertions.assertThat;
@@ -424,4 +425,133 @@ class PartitionUtilsTest {
String detectInvalid = detectInvalidName(toStringResult);
assertThat(detectInvalid).isEqualTo(null);
}
+
+ // ---- Round-trip tests for parseValueOfType ----
+
+ @Test
+ void testRoundTripString() {
+ assertRoundTrip(BinaryString.fromString("Fluss"), DataTypeRoot.STRING);
+ }
+
+ @Test
+ void testRoundTripChar() {
+ assertRoundTrip(BinaryString.fromString("F"), DataTypeRoot.CHAR);
+ }
+
+ @Test
+ void testRoundTripBoolean() {
+ assertRoundTrip(true, DataTypeRoot.BOOLEAN);
+ assertRoundTrip(false, DataTypeRoot.BOOLEAN);
+ }
+
+ @Test
+ void testRoundTripBytes() {
+ byte[] value = new byte[] {0x10, 0x20, 0x30, 0x40, 0x50, (byte) 0xFF};
+ String str = convertValueOfType(value, DataTypeRoot.BYTES);
+ byte[] parsed = (byte[]) parseValueOfType(str, DataTypeRoot.BYTES);
+ assertThat(parsed).isEqualTo(value);
+ }
+
+ @Test
+ void testRoundTripBinary() {
+ byte[] value = new byte[] {0x10, 0x20, 0x30, 0x40, 0x50, (byte) 0xFF};
+ String str = convertValueOfType(value, DataTypeRoot.BINARY);
+ byte[] parsed = (byte[]) parseValueOfType(str, DataTypeRoot.BINARY);
+ assertThat(parsed).isEqualTo(value);
+ }
+
+ @Test
+ void testRoundTripTinyInt() {
+ assertRoundTrip((byte) 100, DataTypeRoot.TINYINT);
+ assertRoundTrip((byte) -100, DataTypeRoot.TINYINT);
+ }
+
+ @Test
+ void testRoundTripSmallInt() {
+ assertRoundTrip((short) -32760, DataTypeRoot.SMALLINT);
+ }
+
+ @Test
+ void testRoundTripInteger() {
+ assertRoundTrip(299000, DataTypeRoot.INTEGER);
+ assertRoundTrip(-299000, DataTypeRoot.INTEGER);
+ }
+
+ @Test
+ void testRoundTripBigInt() {
+ assertRoundTrip(1748662955428L, DataTypeRoot.BIGINT);
+ assertRoundTrip(-1748662955428L, DataTypeRoot.BIGINT);
+ }
+
+ @Test
+ void testRoundTripDate() {
+ assertRoundTrip(20235, DataTypeRoot.DATE);
+ assertRoundTrip(0, DataTypeRoot.DATE);
+ }
+
+ @Test
+ void testRoundTripTime() {
+ assertRoundTrip(5402199, DataTypeRoot.TIME_WITHOUT_TIME_ZONE);
+ assertRoundTrip(0, DataTypeRoot.TIME_WITHOUT_TIME_ZONE);
+ }
+
+ @Test
+ void testRoundTripFloat() {
+ assertRoundTrip(5.73f, DataTypeRoot.FLOAT);
+ assertRoundTrip(Float.NaN, DataTypeRoot.FLOAT);
+ assertRoundTrip(Float.POSITIVE_INFINITY, DataTypeRoot.FLOAT);
+ assertRoundTrip(Float.NEGATIVE_INFINITY, DataTypeRoot.FLOAT);
+ }
+
+ @Test
+ void testRoundTripDouble() {
+ assertRoundTrip(5.73, DataTypeRoot.DOUBLE);
+ assertRoundTrip(Double.NaN, DataTypeRoot.DOUBLE);
+ assertRoundTrip(Double.POSITIVE_INFINITY, DataTypeRoot.DOUBLE);
+ assertRoundTrip(Double.NEGATIVE_INFINITY, DataTypeRoot.DOUBLE);
+ }
+
+ @Test
+ void testRoundTripTimestampNtz() {
+ // With nanos
+ TimestampNtz ts1 = TimestampNtz.fromMillis(1748662955428L, 99988);
+ assertRoundTrip(ts1, DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+
+ // Zero nanos
+ TimestampNtz ts2 = TimestampNtz.fromMillis(1748662955428L, 0);
+ assertRoundTrip(ts2, DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+
+ // Zero millis-of-second with nanos
+ TimestampNtz ts3 = TimestampNtz.fromMillis(1748662955000L, 99988);
+ assertRoundTrip(ts3, DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+
+ // Zero millis-of-second and zero nanos
+ TimestampNtz ts4 = TimestampNtz.fromMillis(1748662955000L, 0);
+ assertRoundTrip(ts4, DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+
+ // Negative millis
+ TimestampNtz ts5 = TimestampNtz.fromMillis(-1748662955428L, 99988);
+ assertRoundTrip(ts5, DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+ }
+
+ @Test
+ void testRoundTripTimestampLtz() {
+ // With nanos
+ TimestampLtz ts1 = TimestampLtz.fromEpochMillis(1748662955428L, 99988);
+ assertRoundTrip(ts1, DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+
+ // Zero nanos
+ TimestampLtz ts2 = TimestampLtz.fromEpochMillis(1748662955428L, 0);
+ assertRoundTrip(ts2, DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+
+ // Negative millis
+ TimestampLtz ts3 = TimestampLtz.fromEpochMillis(-1748662955428L,
99988);
+ assertRoundTrip(ts3, DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+ }
+
+ private void assertRoundTrip(Object originalValue, DataTypeRoot type) {
+ String str = convertValueOfType(originalValue, type);
+ Object parsed = parseValueOfType(str, type);
+ assertThat(parsed).isEqualTo(originalValue);
+ }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java
index 8472b825b..8ac7aabe4 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java
@@ -23,6 +23,7 @@ import org.apache.fluss.lake.writer.LakeWriter;
import org.apache.fluss.lake.writer.WriterInitContext;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.types.RowType;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
@@ -52,6 +53,7 @@ public class PaimonLakeWriter implements
LakeWriter<PaimonWriteResult> {
writerInitContext.tableInfo().getTableConfig().isDataLakeAutoCompaction());
List<String> partitionKeys = fileStoreTable.partitionKeys();
+ RowType flussRowType = writerInitContext.tableInfo().getRowType();
this.recordWriter =
fileStoreTable.primaryKeys().isEmpty()
@@ -59,12 +61,14 @@ public class PaimonLakeWriter implements
LakeWriter<PaimonWriteResult> {
fileStoreTable,
writerInitContext.tableBucket(),
writerInitContext.partition(),
- partitionKeys)
+ partitionKeys,
+ flussRowType)
: new MergeTreeWriter(
fileStoreTable,
writerInitContext.tableBucket(),
writerInitContext.partition(),
- partitionKeys);
+ partitionKeys,
+ flussRowType);
}
@Override
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
index 413b14189..bf509bc32 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
@@ -17,8 +17,13 @@
package org.apache.fluss.lake.paimon.tiering;
+import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
+import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.utils.PartitionUtils;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.table.sink.CommitMessage;
@@ -38,7 +43,7 @@ public abstract class RecordWriter<T> implements
AutoCloseable {
protected final RowType tableRowType;
protected final int bucket;
protected final List<String> partitionKeys;
- @Nullable protected BinaryRow partition;
+ protected final BinaryRow partition;
protected final FlussRecordAsPaimonRow flussRecordAsPaimonRow;
public RecordWriter(
@@ -46,14 +51,18 @@ public abstract class RecordWriter<T> implements
AutoCloseable {
RowType tableRowType,
TableBucket tableBucket,
@Nullable String partition,
- List<String> partitionKeys) {
+ List<String> partitionKeys,
+ org.apache.fluss.types.RowType flussRowType) {
this.tableWrite = tableWrite;
this.tableRowType = tableRowType;
this.bucket = tableBucket.getBucket();
this.partitionKeys = partitionKeys;
- // set partition to EMPTY_ROW in advance for non-partitioned table
if (partition == null || partitionKeys.isEmpty()) {
+ // non-partitioned table
this.partition = BinaryRow.EMPTY_ROW;
+ } else {
+ // eagerly resolve BinaryRow partition from partition name string
+ this.partition = resolvePartition(partition, partitionKeys,
flussRowType);
}
this.flussRecordAsPaimonRow =
new FlussRecordAsPaimonRow(tableBucket.getBucket(),
tableRowType);
@@ -73,4 +82,38 @@ public abstract class RecordWriter<T> implements
AutoCloseable {
public void close() throws Exception {
tableWrite.close();
}
+
+ /**
+ * Resolves a Paimon {@link BinaryRow} partition from the partition name
string by parsing each
+ * partition value to its typed Fluss representation, constructing a
synthetic row, and
+ * delegating to Paimon's partition extraction.
+ */
+ private BinaryRow resolvePartition(
+ String partitionName,
+ List<String> partitionKeys,
+ org.apache.fluss.types.RowType flussRowType) {
+ ResolvedPartitionSpec spec =
+ ResolvedPartitionSpec.fromPartitionName(partitionKeys,
partitionName);
+ List<String> partitionValues = spec.getPartitionValues();
+
+ // Build a GenericRow with partition column values at their correct
positions.
+ // The row field count must match the Paimon RowType (business columns
+ system columns)
+ // so that FlussRowAsPaimonRow aligns with the Paimon schema.
+ GenericRow partitionRow = new GenericRow(tableRowType.getFieldCount());
+
+ for (int i = 0; i < partitionKeys.size(); i++) {
+ String partitionKey = partitionKeys.get(i);
+ int fieldIndex = flussRowType.getFieldIndex(partitionKey);
+ checkState(
+ fieldIndex >= 0,
+ "Partition key '%s' not found in Fluss row type.",
+ partitionKey);
+ DataTypeRoot typeRoot =
flussRowType.getTypeAt(fieldIndex).getTypeRoot();
+ Object typedValue =
PartitionUtils.parseValueOfType(partitionValues.get(i), typeRoot);
+ partitionRow.setField(fieldIndex, typedValue);
+ }
+
+ FlussRowAsPaimonRow paimonRow = new FlussRowAsPaimonRow(partitionRow,
tableRowType);
+ return tableWrite.getPartition(paimonRow);
+ }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
index 7daec20b7..a0966dfd4 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
@@ -20,6 +20,7 @@ package org.apache.fluss.lake.paimon.tiering.append;
import org.apache.fluss.lake.paimon.tiering.RecordWriter;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.types.RowType;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.table.BucketMode;
@@ -41,7 +42,8 @@ public class AppendOnlyWriter extends
RecordWriter<InternalRow> {
FileStoreTable fileStoreTable,
TableBucket tableBucket,
@Nullable String partition,
- List<String> partitionKeys) {
+ List<String> partitionKeys,
+ RowType flussRowType) {
//noinspection unchecked
super(
(TableWriteImpl<InternalRow>)
@@ -50,7 +52,8 @@ public class AppendOnlyWriter extends
RecordWriter<InternalRow> {
fileStoreTable.rowType(),
tableBucket,
partition,
- partitionKeys); // Pass to parent
+ partitionKeys,
+ flussRowType);
this.fileStoreTable = fileStoreTable;
}
@@ -58,11 +61,6 @@ public class AppendOnlyWriter extends
RecordWriter<InternalRow> {
public void write(LogRecord record) throws Exception {
flussRecordAsPaimonRow.setFlussRecord(record);
- // get partition once
- if (partition == null) {
- partition = tableWrite.getPartition(flussRecordAsPaimonRow);
- }
-
// hacky, call internal method tableWrite.getWrite() to support
// to write to given partition, otherwise, it'll always extract a
partition from Paimon row
// which may be costly
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
index 95a527518..0778d73d0 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
@@ -20,6 +20,7 @@ package org.apache.fluss.lake.paimon.tiering.mergetree;
import org.apache.fluss.lake.paimon.tiering.RecordWriter;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.types.RowType;
import org.apache.paimon.KeyValue;
import org.apache.paimon.disk.IOManager;
@@ -49,13 +50,15 @@ public class MergeTreeWriter extends RecordWriter<KeyValue>
{
FileStoreTable fileStoreTable,
TableBucket tableBucket,
@Nullable String partition,
- List<String> partitionKeys) {
+ List<String> partitionKeys,
+ RowType flussRowType) {
super(
createTableWrite(fileStoreTable),
fileStoreTable.rowType(),
tableBucket,
partition,
- partitionKeys);
+ partitionKeys,
+ flussRowType);
this.rowKeyExtractor = fileStoreTable.createRowKeyExtractor();
}
@@ -78,11 +81,6 @@ public class MergeTreeWriter extends RecordWriter<KeyValue> {
public void write(LogRecord record) throws Exception {
flussRecordAsPaimonRow.setFlussRecord(record);
- // get partition once
- if (partition == null) {
- partition = tableWrite.getPartition(flussRecordAsPaimonRow);
- }
-
rowKeyExtractor.setRecord(flussRecordAsPaimonRow);
keyValue.replace(
rowKeyExtractor.trimmedPrimaryKey(),