This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 058cf13a6 [lake/paimon] Support tiering paimon deletion vector enabled 
table (#1725)
058cf13a6 is described below

commit 058cf13a61bb01adbdfb81051c7b1efa75edfe47
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Sep 22 15:28:11 2025 +0800

    [lake/paimon] Support tiering paimon deletion vector enabled table (#1725)
---
 .../flink/tiering/source/TieringSplitReader.java   |   2 +-
 .../fluss/lake/paimon/tiering/RecordWriter.java    |   5 +-
 .../paimon/tiering/mergetree/MergeTreeWriter.java  |  17 +-
 .../testutils/FlinkPaimonTieringTestBase.java      |  29 ++-
 .../lake/paimon/tiering/PaimonTieringITCase.java   | 195 ++++++++++++---------
 .../tiering/ReCreateSameTableAfterTieringTest.java |   4 +-
 6 files changed, 158 insertions(+), 94 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
index bcee1861d..673b723d8 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
@@ -234,7 +234,7 @@ public class TieringSplitReader<WriteResult>
             // instead of fail directly
             checkArgument(
                     currentTableInfo.getTableId() == 
split.getTableBucket().getTableId(),
-                    "The current table id %s for table path % is different 
from the table id %s in TieringSplit split.",
+                    "The current table id %s for table path %s is different 
from the table id %s in TieringSplit split.",
                     currentTableInfo.getTableId(),
                     tablePath,
                     split.getTableBucket().getTableId());
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
index 209d356ab..f835b0486 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
@@ -59,7 +59,10 @@ public abstract class RecordWriter<T> implements 
AutoCloseable {
 
     CommitMessage complete() throws Exception {
         List<CommitMessage> commitMessages = tableWrite.prepareCommit();
-        checkState(commitMessages.size() == 1, "The size of CommitMessage must 
be 1.");
+        checkState(
+                commitMessages.size() == 1,
+                "The size of CommitMessage must be 1, but got %s.",
+                commitMessages);
         return commitMessages.get(0);
     }
 
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
index 4a32392bf..c27ebef6d 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
@@ -22,6 +22,7 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.record.LogRecord;
 
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.RowKeyExtractor;
 import org.apache.paimon.table.sink.TableWriteImpl;
@@ -29,6 +30,7 @@ import org.apache.paimon.table.sink.TableWriteImpl;
 import javax.annotation.Nullable;
 
 import java.util.List;
+import java.util.Map;
 
 import static 
org.apache.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
 import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
@@ -36,6 +38,9 @@ import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
 /** A {@link RecordWriter} to write to Paimon's primary-key table. */
 public class MergeTreeWriter extends RecordWriter<KeyValue> {
 
+    // the option key to configure the temporary directory used by fluss 
tiering
+    private static final String FLUSS_TIERING_TMP_DIR_KEY = 
"fluss.tiering.io-tmpdir";
+
     private final KeyValue keyValue = new KeyValue();
 
     private final RowKeyExtractor rowKeyExtractor;
@@ -55,8 +60,18 @@ public class MergeTreeWriter extends RecordWriter<KeyValue> {
     }
 
     private static TableWriteImpl<KeyValue> createTableWrite(FileStoreTable 
fileStoreTable) {
+        // we allow users to configure the temporary directory used by fluss 
tiering
+        // since the default java.io.tmpdir may not be suitable.
+        // currently, we don't expose the option, as a workaround way, maybe 
in the future we can
+        // expose it if it's needed
+        Map<String, String> props = fileStoreTable.options();
+        String tmpDir =
+                props.getOrDefault(FLUSS_TIERING_TMP_DIR_KEY, 
System.getProperty("java.io.tmpdir"));
         //noinspection unchecked
-        return (TableWriteImpl<KeyValue>) 
fileStoreTable.newWrite(FLUSS_LAKE_TIERING_COMMIT_USER);
+        return (TableWriteImpl<KeyValue>)
+                fileStoreTable
+                        .newWrite(FLUSS_LAKE_TIERING_COMMIT_USER)
+                        .withIOManager(IOManager.create(tmpDir));
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 7405841b5..995b2db46 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -59,6 +59,7 @@ import java.nio.file.Files;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -329,8 +330,25 @@ public abstract class FlinkPaimonTieringTestBase {
         return createPkTable(tablePath, 1);
     }
 
+    protected long createPkTable(
+            TablePath tablePath,
+            Map<String, String> tableProperties,
+            Map<String, String> tableCustomProperties)
+            throws Exception {
+        return createPkTable(tablePath, 1, tableProperties, 
tableCustomProperties);
+    }
+
     protected long createPkTable(TablePath tablePath, int bucketNum) throws 
Exception {
-        TableDescriptor table1Descriptor =
+        return createPkTable(tablePath, bucketNum, Collections.emptyMap(), 
Collections.emptyMap());
+    }
+
+    protected long createPkTable(
+            TablePath tablePath,
+            int bucketNum,
+            Map<String, String> tableProperties,
+            Map<String, String> tableCustomProperties)
+            throws Exception {
+        TableDescriptor.Builder tableDescriptor =
                 TableDescriptor.builder()
                         .schema(
                                 Schema.newBuilder()
@@ -340,9 +358,10 @@ public abstract class FlinkPaimonTieringTestBase {
                                         .build())
                         .distributedBy(bucketNum)
                         .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
-                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500))
-                        .build();
-        return createTable(tablePath, table1Descriptor);
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+        tableDescriptor.customProperties(tableCustomProperties);
+        tableDescriptor.properties(tableProperties);
+        return createTable(tablePath, tableDescriptor.build());
     }
 
     protected void dropTable(TablePath tablePath) throws Exception {
@@ -422,7 +441,7 @@ public abstract class FlinkPaimonTieringTestBase {
                 "bucket " + tb + "not synced");
     }
 
-    protected void checkDataInPaimonPrimayKeyTable(
+    protected void checkDataInPaimonPrimaryKeyTable(
             TablePath tablePath, List<InternalRow> expectedRows) throws 
Exception {
         Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
                 getPaimonRowCloseableIterator(tablePath);
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 301427186..442962f31 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -92,97 +92,124 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
         // then start tiering job
         JobClient jobClient = buildTieringJob(execEnv);
 
-        // check the status of replica after synced
-        assertReplicaStatus(t1Bucket, 3);
-        // check data in paimon
-        checkDataInPaimonPrimayKeyTable(t1, rows);
-        // check snapshot property in paimon
-        Map<String, String> properties =
-                new HashMap<String, String>() {
-                    {
-                        put(
-                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
-                                "[{\"bucket_id\":0,\"log_offset\":3}]");
-                    }
-                };
-        checkSnapshotPropertyInPaimon(t1, properties);
-
-        // then, create another log table
-        TablePath t2 = TablePath.of(DEFAULT_DB, "logTable");
-        long t2Id = createLogTable(t2);
-        TableBucket t2Bucket = new TableBucket(t2Id, 0);
-        List<InternalRow> flussRows = new ArrayList<>();
-        // write records
-        for (int i = 0; i < 10; i++) {
-            rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
-            flussRows.addAll(rows);
+        try {
+            // check the status of replica after synced
+            assertReplicaStatus(t1Bucket, 3);
+            // check data in paimon
+            checkDataInPaimonPrimaryKeyTable(t1, rows);
+            // check snapshot property in paimon
+            Map<String, String> properties =
+                    new HashMap<String, String>() {
+                        {
+                            put(
+                                    FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                    "[{\"bucket_id\":0,\"log_offset\":3}]");
+                        }
+                    };
+            checkSnapshotPropertyInPaimon(t1, properties);
+
+            // then, create another log table
+            TablePath t2 = TablePath.of(DEFAULT_DB, "logTable");
+            long t2Id = createLogTable(t2);
+            TableBucket t2Bucket = new TableBucket(t2Id, 0);
+            List<InternalRow> flussRows = new ArrayList<>();
+            // write records
+            for (int i = 0; i < 10; i++) {
+                rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
+                flussRows.addAll(rows);
+                // write records
+                writeRows(t2, rows, true);
+            }
+            // check the status of replica after synced;
+            // note: we can't update log start offset for unaware bucket mode 
log table
+            assertReplicaStatus(t2Bucket, 30);
+
+            // check data in paimon
+            checkDataInPaimonAppendOnlyTable(t2, flussRows, 0);
+
+            // then write data to the pk tables
             // write records
-            writeRows(t2, rows, true);
+            rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3, 
"v333"));
+            // write records
+            writeRows(t1, rows, false);
+
+            // check the status of replica of t2 after synced
+            // not check start offset since we won't
+            // update start log offset for primary key table
+            assertReplicaStatus(t1Bucket, 9);
+
+            checkDataInPaimonPrimaryKeyTable(t1, rows);
+
+            // then create partitioned table and wait partitions are ready
+            TablePath partitionedTablePath = TablePath.of(DEFAULT_DB, 
"partitionedTable");
+            Tuple2<Long, TableDescriptor> tableIdAndDescriptor =
+                    createPartitionedTable(partitionedTablePath);
+            Map<Long, String> partitionNameByIds = 
waitUntilPartitions(partitionedTablePath);
+
+            // now, write rows into partitioned table
+            TableDescriptor partitionedTableDescriptor = 
tableIdAndDescriptor.f1;
+            Map<String, List<InternalRow>> writtenRowsByPartition =
+                    writeRowsIntoPartitionedTable(
+                            partitionedTablePath, partitionedTableDescriptor, 
partitionNameByIds);
+            long tableId = tableIdAndDescriptor.f0;
+
+            // wait until synced to paimon
+            for (Long partitionId : partitionNameByIds.keySet()) {
+                TableBucket tableBucket = new TableBucket(tableId, 
partitionId, 0);
+                assertReplicaStatus(tableBucket, 3);
+            }
+
+            // now, let's check data in paimon per partition
+            // check data in paimon
+            String partitionCol = 
partitionedTableDescriptor.getPartitionKeys().get(0);
+            for (String partitionName : partitionNameByIds.values()) {
+                checkDataInPaimonAppendOnlyPartitionedTable(
+                        partitionedTablePath,
+                        Collections.singletonMap(partitionCol, partitionName),
+                        writtenRowsByPartition.get(partitionName),
+                        0);
+            }
+
+            properties =
+                    new HashMap<String, String>() {
+                        {
+                            put(
+                                    FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                    "["
+                                            + 
"{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
+                                            + 
"{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
+                                            + "]");
+                        }
+                    };
+            checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
+        } finally {
+            jobClient.cancel().get();
         }
-        // check the status of replica after synced;
-        // note: we can't update log start offset for unaware bucket mode log 
table
-        assertReplicaStatus(t2Bucket, 30);
-
-        // check data in paimon
-        checkDataInPaimonAppendOnlyTable(t2, flussRows, 0);
+    }
 
-        // then write data to the pk tables
-        // write records
-        rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3, "v333"));
+    @Test
+    void testTieringToDvEnabledTable() throws Exception {
+        TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableWithDv");
+        long t1Id =
+                createPkTable(
+                        t1,
+                        
Collections.singletonMap("table.datalake.auto-compaction", "true"),
+                        
Collections.singletonMap("paimon.deletion-vectors.enabled", "true"));
         // write records
+        List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
         writeRows(t1, rows, false);
+        waitUntilSnapshot(t1Id, 1, 0);
 
-        // check the status of replica of t2 after synced
-        // not check start offset since we won't
-        // update start log offset for primary key table
-        assertReplicaStatus(t1Bucket, 9);
-
-        checkDataInPaimonPrimayKeyTable(t1, rows);
-
-        // then create partitioned table and wait partitions are ready
-        TablePath partitionedTablePath = TablePath.of(DEFAULT_DB, 
"partitionedTable");
-        Tuple2<Long, TableDescriptor> tableIdAndDescriptor =
-                createPartitionedTable(partitionedTablePath);
-        Map<Long, String> partitionNameByIds = 
waitUntilPartitions(partitionedTablePath);
-
-        // now, write rows into partitioned table
-        TableDescriptor partitionedTableDescriptor = tableIdAndDescriptor.f1;
-        Map<String, List<InternalRow>> writtenRowsByPartition =
-                writeRowsIntoPartitionedTable(
-                        partitionedTablePath, partitionedTableDescriptor, 
partitionNameByIds);
-        long tableId = tableIdAndDescriptor.f0;
-
-        // wait until synced to paimon
-        for (Long partitionId : partitionNameByIds.keySet()) {
-            TableBucket tableBucket = new TableBucket(tableId, partitionId, 0);
-            assertReplicaStatus(tableBucket, 3);
-        }
-
-        // now, let's check data in paimon per partition
-        // check data in paimon
-        String partitionCol = 
partitionedTableDescriptor.getPartitionKeys().get(0);
-        for (String partitionName : partitionNameByIds.values()) {
-            checkDataInPaimonAppendOnlyPartitionedTable(
-                    partitionedTablePath,
-                    Collections.singletonMap(partitionCol, partitionName),
-                    writtenRowsByPartition.get(partitionName),
-                    0);
+        // then start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+        try {
+            // check the status of replica after synced
+            assertReplicaStatus(new TableBucket(t1Id, 0), 3);
+            // check data in paimon
+            checkDataInPaimonPrimaryKeyTable(t1, rows);
+        } finally {
+            jobClient.cancel().get();
         }
-
-        properties =
-                new HashMap<String, String>() {
-                    {
-                        put(
-                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
-                                "["
-                                        + 
"{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
-                                        + 
"{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
-                                        + "]");
-                    }
-                };
-        checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
-
-        jobClient.cancel().get();
     }
 
     private Tuple2<Long, TableDescriptor> createPartitionedTable(TablePath 
partitionedTablePath)
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
index 5a0b8ccd9..6520770c3 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
@@ -73,7 +73,7 @@ class ReCreateSameTableAfterTieringTest extends 
FlinkPaimonTieringTestBase {
         // check the status of replica after synced
         assertReplicaStatus(t1Bucket, 3);
         // check data in paimon
-        checkDataInPaimonPrimayKeyTable(t1, rows);
+        checkDataInPaimonPrimaryKeyTable(t1, rows);
 
         // then drop the table
         dropTable(t1);
@@ -88,7 +88,7 @@ class ReCreateSameTableAfterTieringTest extends 
FlinkPaimonTieringTestBase {
         // check the status of replica after synced
         assertReplicaStatus(t2Bucket, 2);
         // check data in paimon
-        checkDataInPaimonPrimayKeyTable(t1, newRows);
+        checkDataInPaimonPrimaryKeyTable(t1, newRows);
 
         // stop the tiering job
         jobClient.cancel().get();

Reply via email to