This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 3faa663b5 [lake/paimon] Paimon lake table support non-string partition
keys (#1817)
3faa663b5 is described below
commit 3faa663b57ee04d85fd9b703b17f611ff8bf8c95
Author: Liebing <[email protected]>
AuthorDate: Thu Oct 16 19:58:20 2025 +0800
[lake/paimon] Paimon lake table support non-string partition keys (#1817)
---
.../fluss/lake/paimon/PaimonLakeCatalog.java | 9 ++
.../fluss/lake/paimon/tiering/RecordWriter.java | 10 +-
.../paimon/tiering/append/AppendOnlyWriter.java | 6 +
.../paimon/tiering/mergetree/MergeTreeWriter.java | 6 +
.../fluss/lake/paimon/utils/PaimonConversions.java | 31 ----
.../lake/paimon/tiering/PaimonTieringITCase.java | 161 +++++++++++++++++++++
6 files changed, 189 insertions(+), 34 deletions(-)
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index f84b6e590..03ffd0cdc 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -157,6 +157,9 @@ public class PaimonLakeCatalog implements LakeCatalog {
Schema.Builder schemaBuilder = Schema.newBuilder();
Options options = new Options();
+ // set default properties
+ setPaimonDefaultProperties(options);
+
// When bucket key is undefined, it should use dynamic bucket (bucket
= -1) mode.
List<String> bucketKeys = tableDescriptor.getBucketKeys();
if (!bucketKeys.isEmpty()) {
@@ -215,6 +218,12 @@ public class PaimonLakeCatalog implements LakeCatalog {
return schemaBuilder.build();
}
+ private void setPaimonDefaultProperties(Options options) {
+ // set partition.legacy-name to false, otherwise paimon will use
toString for all types,
+ // which will cause inconsistent partition value for a same binary
value
+ options.set(CoreOptions.PARTITION_GENERATE_LEGCY_NAME, false);
+ }
+
private void setFlussPropertyToPaimon(String key, String value, Options
options) {
if (key.startsWith(PAIMON_CONF_PREFIX)) {
options.set(key.substring(PAIMON_CONF_PREFIX.length()), value);
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
index f835b0486..413b14189 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
@@ -29,7 +29,6 @@ import javax.annotation.Nullable;
import java.util.List;
-import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonPartitionBinaryRow;
import static org.apache.fluss.utils.Preconditions.checkState;
/** A base interface to write {@link LogRecord} to Paimon. */
@@ -38,7 +37,8 @@ public abstract class RecordWriter<T> implements
AutoCloseable {
protected final TableWriteImpl<T> tableWrite;
protected final RowType tableRowType;
protected final int bucket;
- @Nullable protected final BinaryRow partition;
+ protected final List<String> partitionKeys;
+ @Nullable protected BinaryRow partition;
protected final FlussRecordAsPaimonRow flussRecordAsPaimonRow;
public RecordWriter(
@@ -50,7 +50,11 @@ public abstract class RecordWriter<T> implements
AutoCloseable {
this.tableWrite = tableWrite;
this.tableRowType = tableRowType;
this.bucket = tableBucket.getBucket();
- this.partition = toPaimonPartitionBinaryRow(partitionKeys, partition);
+ this.partitionKeys = partitionKeys;
+ // set partition to EMPTY_ROW in advance for non-partitioned table
+ if (partition == null || partitionKeys.isEmpty()) {
+ this.partition = BinaryRow.EMPTY_ROW;
+ }
this.flussRecordAsPaimonRow =
new FlussRecordAsPaimonRow(tableBucket.getBucket(),
tableRowType);
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
index 2d571358d..7daec20b7 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
@@ -57,6 +57,12 @@ public class AppendOnlyWriter extends
RecordWriter<InternalRow> {
@Override
public void write(LogRecord record) throws Exception {
flussRecordAsPaimonRow.setFlussRecord(record);
+
+ // get partition once
+ if (partition == null) {
+ partition = tableWrite.getPartition(flussRecordAsPaimonRow);
+ }
+
// hacky, call internal method tableWrite.getWrite() to support
// to write to given partition, otherwise, it'll always extract a
partition from Paimon row
// which may be costly
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
index c27ebef6d..95a527518 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
@@ -77,6 +77,12 @@ public class MergeTreeWriter extends RecordWriter<KeyValue> {
@Override
public void write(LogRecord record) throws Exception {
flussRecordAsPaimonRow.setFlussRecord(record);
+
+ // get partition once
+ if (partition == null) {
+ partition = tableWrite.getPartition(flussRecordAsPaimonRow);
+ }
+
rowKeyExtractor.setRecord(flussRecordAsPaimonRow);
keyValue.replace(
rowKeyExtractor.trimmedPrimaryKey(),
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
index c456411e8..a9491659f 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
@@ -18,7 +18,6 @@
package org.apache.fluss.lake.paimon.utils;
import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
-import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.ChangeType;
@@ -26,16 +25,11 @@ import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.BinaryRowWriter;
-import org.apache.paimon.data.BinaryString;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
-import javax.annotation.Nullable;
-
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
@@ -78,31 +72,6 @@ public class PaimonConversions {
return Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName());
}
- public static BinaryRow toPaimonPartitionBinaryRow(
- List<String> partitionKeys, @Nullable String partitionName) {
- if (partitionName == null || partitionKeys.isEmpty()) {
- return BinaryRow.EMPTY_ROW;
- }
-
- // Fluss's existing utility
- ResolvedPartitionSpec resolvedPartitionSpec =
- ResolvedPartitionSpec.fromPartitionName(partitionKeys,
partitionName);
-
- BinaryRow partitionBinaryRow = new BinaryRow(partitionKeys.size());
- BinaryRowWriter writer = new BinaryRowWriter(partitionBinaryRow);
-
- List<String> partitionValues =
resolvedPartitionSpec.getPartitionValues();
- for (int i = 0; i < partitionKeys.size(); i++) {
- // Todo Currently, partition column must be String datatype, so we
can always use
- // `BinaryString.fromString` to convert to Paimon's data
structure. Revisit here when
- // #489 is finished.
- writer.writeString(i,
BinaryString.fromString(partitionValues.get(i)));
- }
-
- writer.complete();
- return partitionBinaryRow;
- }
-
public static Object toPaimonLiteral(DataType dataType, Object
flussLiteral) {
RowType rowType = RowType.of(dataType);
InternalRow flussRow = GenericRow.of(flussLiteral);
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 3a85a2aa7..18748c1bf 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -17,15 +17,22 @@
package org.apache.fluss.lake.paimon.tiering;
+import org.apache.fluss.client.table.getter.PartitionGetter;
import org.apache.fluss.config.AutoPartitionTimeUnit;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
+import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.utils.types.Tuple2;
@@ -40,8 +47,17 @@ import org.apache.paimon.utils.CloseableIterator;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -49,6 +65,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import static
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
import static org.apache.fluss.testutils.DataTestUtils.row;
@@ -185,6 +202,150 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
}
}
+ private static Stream<Arguments> tieringAllTypesWriteArgs() {
+ return Stream.of(Arguments.of(true), Arguments.of(false));
+ }
+
+ @ParameterizedTest
+ @MethodSource("tieringAllTypesWriteArgs")
+ void testTieringForAllTypes(boolean isPrimaryKeyTable) throws Exception {
+ // create a table, write some records and wait until snapshot finished
+ TablePath t1 =
+ TablePath.of(
+ DEFAULT_DB,
+ isPrimaryKeyTable ? "pkTableForAllTypes" :
"logTableForAllTypes");
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .column("c0", DataTypes.STRING())
+ .column("c1", DataTypes.BOOLEAN())
+ .column("c2", DataTypes.TINYINT())
+ .column("c3", DataTypes.SMALLINT())
+ .column("c4", DataTypes.INT())
+ .column("c5", DataTypes.BIGINT())
+ .column("c6", DataTypes.FLOAT())
+ .column("c7", DataTypes.DOUBLE())
+ // decimal not support for partition key
+ .column("c8", DataTypes.DECIMAL(10, 2))
+ .column("c9", DataTypes.CHAR(10))
+ .column("c10", DataTypes.STRING())
+ .column("c11", DataTypes.BYTES())
+ .column("c12", DataTypes.BINARY(5))
+ .column("c13", DataTypes.DATE())
+ .column("c14", DataTypes.TIME(6))
+ .column("c15", DataTypes.TIMESTAMP(6))
+ .column("c16", DataTypes.TIMESTAMP_LTZ(6));
+ if (isPrimaryKeyTable) {
+ builder.primaryKey(
+ "c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c9",
"c10", "c11", "c12",
+ "c13", "c14", "c15", "c16");
+ }
+ List<String> partitionKeys =
+ Arrays.asList(
+ "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c9", "c10",
"c11", "c12", "c13",
+ "c14", "c15", "c16");
+ TableDescriptor.Builder tableDescriptor =
+ TableDescriptor.builder()
+ .schema(builder.build())
+ .distributedBy(1, "c0")
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+ tableDescriptor.partitionedBy(partitionKeys);
+ tableDescriptor.customProperties(Collections.emptyMap());
+ tableDescriptor.properties(Collections.emptyMap());
+ long t1Id = createTable(t1, tableDescriptor.build());
+
+ // write records
+ List<InternalRow> rows =
+ Collections.singletonList(
+ row(
+ BinaryString.fromString("v0"),
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 5.0f,
+ 6.0,
+ Decimal.fromBigDecimal(new BigDecimal("0.09"),
10, 2),
+ BinaryString.fromString("v1"),
+ BinaryString.fromString("v2"),
+ "v3".getBytes(StandardCharsets.UTF_8),
+ new byte[] {1, 2, 3, 4, 5},
+ (int) LocalDate.of(2025, 10, 16).toEpochDay(),
+ (int)
+ (LocalTime.of(10, 10, 10,
123000000).toNanoOfDay()
+ / 1_000_000),
+ TimestampNtz.fromLocalDateTime(
+ LocalDateTime.of(2025, 10, 16, 10, 10,
10, 123000000)),
+ TimestampLtz.fromInstant(
+
Instant.parse("2025-10-16T10:10:10.123Z"))));
+ writeRows(t1, rows, !isPrimaryKeyTable);
+
+ TableInfo tableInfo = admin.getTableInfo(t1).get();
+ List<PartitionInfo> partitionInfos =
admin.listPartitionInfos(t1).get();
+ assertThat(partitionInfos.size()).isEqualTo(1);
+ PartitionGetter partitionGetter =
+ new PartitionGetter(tableInfo.getRowType(), partitionKeys);
+ String partition = partitionGetter.getPartition(rows.get(0));
+
assertThat(partitionInfos.get(0).getPartitionName()).isEqualTo(partition);
+
+ long partitionId = partitionInfos.get(0).getPartitionId();
+ TableBucket t1Bucket = new TableBucket(t1Id, partitionId, 0);
+
+ // then start tiering job
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ try {
+ // check the status of replica after synced
+ assertReplicaStatus(t1Bucket, 1);
+
+ // check data in paimon
+ Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
+ getPaimonRowCloseableIterator(t1);
+ for (InternalRow expectedRow : rows) {
+ org.apache.paimon.data.InternalRow row =
paimonRowIterator.next();
+ assertThat(row.getString(0).toString())
+ .isEqualTo(expectedRow.getString(0).toString());
+
assertThat(row.getBoolean(1)).isEqualTo(expectedRow.getBoolean(1));
+ assertThat(row.getByte(2)).isEqualTo(expectedRow.getByte(2));
+ assertThat(row.getShort(3)).isEqualTo(expectedRow.getShort(3));
+ assertThat(row.getInt(4)).isEqualTo(expectedRow.getInt(4));
+ assertThat(row.getLong(5)).isEqualTo(expectedRow.getLong(5));
+ assertThat(row.getFloat(6)).isEqualTo(expectedRow.getFloat(6));
+
assertThat(row.getDouble(7)).isEqualTo(expectedRow.getDouble(7));
+ assertThat(row.getDecimal(8, 10, 2).toBigDecimal())
+ .isEqualTo(expectedRow.getDecimal(8, 10,
2).toBigDecimal());
+ assertThat(row.getString(9).toString())
+ .isEqualTo(expectedRow.getString(9).toString());
+ assertThat(row.getString(10).toString())
+ .isEqualTo(expectedRow.getString(10).toString());
+
assertThat(row.getBinary(11)).isEqualTo(expectedRow.getBytes(11));
+
assertThat(row.getBinary(12)).isEqualTo(expectedRow.getBinary(12, 5));
+ assertThat(row.getInt(13)).isEqualTo(expectedRow.getInt(13));
+ assertThat(row.getInt(14)).isEqualTo(expectedRow.getInt(14));
+ assertThat(row.getTimestamp(15, 6).getMillisecond())
+ .isEqualTo(expectedRow.getTimestampNtz(15,
6).getMillisecond());
+ assertThat(row.getTimestamp(16, 6).getMillisecond())
+ .isEqualTo(expectedRow.getTimestampLtz(16,
6).getEpochMillisecond());
+
+ // check snapshot in paimon
+ Map<String, String> properties =
+ new HashMap<String, String>() {
+ {
+ put(
+ FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+ String.format(
+
"[{\"partition_id\":%d,\"bucket\":0,\"partition_name\":\"c1=true/c2=1/c3=2/c4=3/c5=4/c6=5_0/c7=6_0/c9=v1/c10=v2/c11=7633/c12=0102030405/c13=2025-10-16/c14=10-10-10_123/c15=2025-10-16-10-10-10_123/c16=2025-10-16-10-10-10_123\",\"offset\":1}]",
+ partitionId));
+ }
+ };
+ checkSnapshotPropertyInPaimon(t1, properties);
+ }
+ } finally {
+ jobClient.cancel().get();
+ }
+ }
+
@Test
void testTieringForAlterTable() throws Exception {
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableAlter");