This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 231f1f7ee [lake/iceberg] Add IT for Iceberg files compaction for 
primary key table (#1658)
231f1f7ee is described below

commit 231f1f7ee93c6aa915de6676ca8642063c77f558
Author: Junbo Wang <[email protected]>
AuthorDate: Tue Sep 9 19:07:12 2025 +0800

    [lake/iceberg] Add IT for Iceberg files compaction for primary key table 
(#1658)
    
    ---------
    
    Co-authored-by: maxcwang <[email protected]>
---
 .../maintenance/IcebergRewriteDataFiles.java       |  20 ++-
 .../iceberg/maintenance/RewriteDataFileResult.java |  13 +-
 .../lake/iceberg/tiering/IcebergLakeCommitter.java |  20 ++-
 .../iceberg/maintenance/IcebergRewriteITCase.java  | 150 +++++++++++++++++++--
 .../testutils/FlinkIcebergTieringTestBase.java     | 133 ++++--------------
 .../lake/iceberg/tiering/IcebergTieringITCase.java |  86 +++++++++++-
 .../tiering/IcebergWriteResultSerializerTest.java  |   4 +-
 7 files changed, 293 insertions(+), 133 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
index 742a808a3..4f93bf983 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.ContentScanTask;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.IcebergGenericReader;
 import org.apache.iceberg.data.Record;
@@ -82,10 +83,15 @@ public class IcebergRewriteDataFiles {
         return this;
     }
 
-    private List<CombinedScanTask> planRewriteFileGroups() throws IOException {
+    private List<CombinedScanTask> planRewriteFileGroups(long snapshotId) 
throws IOException {
         List<FileScanTask> fileScanTasks = new ArrayList<>();
         try (CloseableIterable<FileScanTask> tasks =
-                
table.newScan().includeColumnStats().filter(filter).ignoreResiduals().planFiles())
 {
+                table.newScan()
+                        .useSnapshot(snapshotId)
+                        .includeColumnStats()
+                        .filter(filter)
+                        .ignoreResiduals()
+                        .planFiles()) {
             tasks.forEach(fileScanTasks::add);
         }
 
@@ -137,7 +143,12 @@ public class IcebergRewriteDataFiles {
     public RewriteDataFileResult execute() {
         try {
             // plan the file groups to be rewrite
-            List<CombinedScanTask> tasksToRewrite = planRewriteFileGroups();
+            Snapshot snapshot = table.currentSnapshot();
+            // if no snapshot, just return
+            if (snapshot == null) {
+                return null;
+            }
+            List<CombinedScanTask> tasksToRewrite = 
planRewriteFileGroups(snapshot.snapshotId());
             if (tasksToRewrite.isEmpty()) {
                 return null;
             }
@@ -152,7 +163,8 @@ public class IcebergRewriteDataFiles {
                                 .collect(Collectors.toList()));
             }
             LOG.info("Finish rewriting files from {} to {}.", 
deletedDataFiles, addedDataFiles);
-            return new RewriteDataFileResult(deletedDataFiles, addedDataFiles);
+            return new RewriteDataFileResult(
+                    snapshot.snapshotId(), deletedDataFiles, addedDataFiles);
         } catch (Exception e) {
             throw new RuntimeException(
                     String.format("Fail to compact bucket %s of table %s.", 
bucket, table.name()),
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
index e24580cab..79e0dfe21 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
@@ -28,10 +28,13 @@ public class RewriteDataFileResult implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    private final long snapshotId;
     private final List<DataFile> deletedDataFiles;
     private final List<DataFile> addedDataFiles;
 
-    public RewriteDataFileResult(List<DataFile> deletedDataFiles, 
List<DataFile> addedDataFiles) {
+    public RewriteDataFileResult(
+            long snapshotId, List<DataFile> deletedDataFiles, List<DataFile> 
addedDataFiles) {
+        this.snapshotId = snapshotId;
         this.deletedDataFiles = deletedDataFiles;
         this.addedDataFiles = addedDataFiles;
     }
@@ -44,10 +47,16 @@ public class RewriteDataFileResult implements Serializable {
         return addedDataFiles;
     }
 
+    public long snapshotId() {
+        return snapshotId;
+    }
+
     @Override
     public String toString() {
         return "RewriteDataFileResult{"
-                + "deletedDataFiles="
+                + "snapshotId="
+                + snapshotId
+                + ", deletedDataFiles="
                 + deletedDataFiles
                 + ", addedDataFiles="
                 + addedDataFiles
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
index 604f6cc6b..80b22e4d9 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -28,6 +28,7 @@ import org.apache.fluss.utils.json.BucketOffsetJsonSerde;
 
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.RewriteFiles;
@@ -162,11 +163,20 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
             Map<String, String> snapshotProperties) {
         icebergTable.refresh();
         RewriteFiles rewriteFiles = icebergTable.newRewrite();
-        for (RewriteDataFileResult rewriteDataFileResult : 
rewriteDataFileResults) {
-            
rewriteDataFileResult.addedDataFiles().forEach(rewriteFiles::addFile);
-            
rewriteDataFileResult.deletedDataFiles().forEach(rewriteFiles::deleteFile);
-        }
         try {
+            if (rewriteDataFileResults.stream()
+                            .map(RewriteDataFileResult::snapshotId)
+                            .distinct()
+                            .count()
+                    > 1) {
+                throw new IllegalArgumentException(
+                        "Rewrite data file results must have same snapshot 
id.");
+            }
+            
rewriteFiles.validateFromSnapshot(rewriteDataFileResults.get(0).snapshotId());
+            for (RewriteDataFileResult rewriteDataFileResult : 
rewriteDataFileResults) {
+                
rewriteDataFileResult.addedDataFiles().forEach(rewriteFiles::addFile);
+                
rewriteDataFileResult.deletedDataFiles().forEach(rewriteFiles::deleteFile);
+            }
             return commit(rewriteFiles, snapshotProperties);
         } catch (Exception e) {
             List<String> rewriteAddedDataFiles =
@@ -174,7 +184,7 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
                             .flatMap(
                                     rewriteDataFileResult ->
                                             
rewriteDataFileResult.addedDataFiles().stream())
-                            .map(dataFile -> dataFile.path().toString())
+                            .map(ContentFile::location)
                             .collect(Collectors.toList());
             LOG.error(
                     "Failed to commit rewrite files to iceberg, delete rewrite 
added files {}.",
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
index a1a28b2d0..4b6d9de71 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
@@ -19,20 +19,27 @@
 package org.apache.fluss.lake.iceberg.maintenance;
 
 import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
+import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
 
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.iceberg.data.Record;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 
 import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Integration test for Iceberg compaction. */
 class IcebergRewriteITCase extends FlinkIcebergTieringTestBase {
@@ -40,6 +47,19 @@ class IcebergRewriteITCase extends 
FlinkIcebergTieringTestBase {
 
     private static StreamExecutionEnvironment execEnv;
 
+    private static final Schema pkSchema =
+            Schema.newBuilder()
+                    .column("f_int", DataTypes.INT())
+                    .column("f_string", DataTypes.STRING())
+                    .primaryKey("f_int")
+                    .build();
+
+    private static final Schema logSchema =
+            Schema.newBuilder()
+                    .column("f_int", DataTypes.INT())
+                    .column("f_string", DataTypes.STRING())
+                    .build();
+
     @BeforeAll
     protected static void beforeAll() {
         FlinkIcebergTieringTestBase.beforeAll();
@@ -48,42 +68,148 @@ class IcebergRewriteITCase extends 
FlinkIcebergTieringTestBase {
         execEnv.enableCheckpointing(1000);
     }
 
+    @Test
+    void testPkTableCompaction() throws Exception {
+        JobClient jobClient = buildTieringJob(execEnv);
+        try {
+            TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_1");
+            long t1Id = createPkTable(t1, 1, true, pkSchema);
+            TableBucket t1Bucket = new TableBucket(t1Id, 0);
+            List<InternalRow> flussRows = new ArrayList<>();
+
+            List<InternalRow> rows = Collections.singletonList(row(1, "v1"));
+            writeIcebergTableRecords(t1, t1Bucket, 1, false, rows);
+            flussRows.addAll(rows);
+
+            rows = Collections.singletonList(row(2, "v1"));
+            writeIcebergTableRecords(t1, t1Bucket, 2, false, rows);
+            flussRows.addAll(rows);
+
+            // add pos-delete
+            rows = Arrays.asList(row(3, "v1"), row(3, "v2"));
+            writeIcebergTableRecords(t1, t1Bucket, 5, false, rows);
+            // one UPDATE_BEFORE and one UPDATE_AFTER
+            checkFileStatusInIcebergTable(t1, 3, true);
+            flussRows.add(rows.get(1));
+
+            // trigger compaction
+            rows = Collections.singletonList(row(4, "v1"));
+            writeIcebergTableRecords(t1, t1Bucket, 6, false, rows);
+            checkFileStatusInIcebergTable(t1, 2, false);
+            flussRows.addAll(rows);
+
+            checkRecords(getIcebergRecords(t1), flussRows);
+        } finally {
+            jobClient.cancel().get();
+        }
+    }
+
+    private void checkRecords(List<Record> actualRows, List<InternalRow> 
expectedRows) {
+        // check records size
+        assertThat(actualRows.size()).isEqualTo(expectedRows.size());
+
+        // check records content
+        Iterator<Record> actualIterator =
+                actualRows.stream()
+                        .sorted(Comparator.comparingInt((Record r) -> (int) 
r.get(0)))
+                        .iterator();
+        Iterator<InternalRow> expectedIterator =
+                expectedRows.stream().sorted(Comparator.comparingInt(r -> 
r.getInt(0))).iterator();
+        while (actualIterator.hasNext() && expectedIterator.hasNext()) {
+            Record record = actualIterator.next();
+            InternalRow row = expectedIterator.next();
+            assertThat(record.get(0)).isEqualTo(row.getInt(0));
+            assertThat(record.get(1)).isEqualTo(row.getString(1).toString());
+        }
+        assertThat(actualIterator.hasNext()).isFalse();
+        assertThat(expectedIterator.hasNext()).isFalse();
+    }
+
+    @Test
+    void testPkTableCompactionWithConflict() throws Exception {
+        JobClient jobClient = buildTieringJob(execEnv);
+        try {
+            TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_2");
+            long t1Id = createPkTable(t1, 1, true, pkSchema);
+            TableBucket t1Bucket = new TableBucket(t1Id, 0);
+            List<InternalRow> flussRows = new ArrayList<>();
+
+            List<InternalRow> rows = Collections.singletonList(row(1, "v1"));
+            flussRows.addAll(writeIcebergTableRecords(t1, t1Bucket, 1, false, 
rows));
+            checkFileStatusInIcebergTable(t1, 1, false);
+
+            rows = Collections.singletonList(row(2, "v1"));
+            flussRows.addAll(writeIcebergTableRecords(t1, t1Bucket, 2, false, 
rows));
+
+            rows = Collections.singletonList(row(3, "v1"));
+            flussRows.addAll(writeIcebergTableRecords(t1, t1Bucket, 3, false, 
rows));
+
+            // add pos-delete and trigger compaction
+            rows = Arrays.asList(row(4, "v1"), row(4, "v2"));
+            flussRows.add(writeIcebergTableRecords(t1, t1Bucket, 6, false, 
rows).get(1));
+            // rewritten files should fail to commit due to conflict, add 
check here
+            checkRecords(getIcebergRecords(t1), flussRows);
+            // 4 data file and 1 delete file
+            checkFileStatusInIcebergTable(t1, 4, true);
+
+            // previous compaction conflicts won't prevent further compaction, 
and check iceberg
+            // records
+            rows = Collections.singletonList(row(5, "v1"));
+            flussRows.addAll(writeIcebergTableRecords(t1, t1Bucket, 7, false, 
rows));
+            checkRecords(getIcebergRecords(t1), flussRows);
+            checkFileStatusInIcebergTable(t1, 2, false);
+        } finally {
+            jobClient.cancel().get();
+        }
+    }
+
     @Test
     void testLogTableCompaction() throws Exception {
         JobClient jobClient = buildTieringJob(execEnv);
         try {
             TablePath t1 = TablePath.of(DEFAULT_DB, "log_table");
-            long t1Id = createLogTable(t1, true);
+            long t1Id = createLogTable(t1, 1, true, logSchema);
             TableBucket t1Bucket = new TableBucket(t1Id, 0);
 
             int i = 0;
             List<InternalRow> flussRows = new ArrayList<>();
-            flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
+            flussRows.addAll(
+                    writeIcebergTableRecords(
+                            t1, t1Bucket, ++i, true, 
Collections.singletonList(row(1, "v1"))));
 
-            flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
+            flussRows.addAll(
+                    writeIcebergTableRecords(
+                            t1, t1Bucket, ++i, true, 
Collections.singletonList(row(1, "v1"))));
 
-            flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
-            checkFileCountInIcebergTable(t1, 3);
+            flussRows.addAll(
+                    writeIcebergTableRecords(
+                            t1, t1Bucket, ++i, true, 
Collections.singletonList(row(1, "v1"))));
+            checkFileStatusInIcebergTable(t1, 3, false);
 
             // Write should trigger compaction now since the current data file 
count is greater or
             // equal MIN_FILES_TO_COMPACT
-            flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
+            flussRows.addAll(
+                    writeIcebergTableRecords(
+                            t1, t1Bucket, ++i, true, 
Collections.singletonList(row(1, "v1"))));
             // Should only have two files now, one file it for newly written, 
one file is for target
             // compacted file
-            checkFileCountInIcebergTable(t1, 2);
+            checkFileStatusInIcebergTable(t1, 2, false);
 
             // check data in iceberg to make sure compaction won't lose data 
or duplicate data
-            checkDataInIcebergAppendOnlyTable(t1, flussRows, 0);
+            checkRecords(getIcebergRecords(t1), flussRows);
         } finally {
             jobClient.cancel().get();
         }
     }
 
-    private List<InternalRow> writeLogTableRecords(
-            TablePath tablePath, TableBucket tableBucket, long 
expectedLogEndOffset)
+    private List<InternalRow> writeIcebergTableRecords(
+            TablePath tablePath,
+            TableBucket tableBucket,
+            long expectedLogEndOffset,
+            boolean append,
+            List<InternalRow> rows)
             throws Exception {
-        List<InternalRow> rows = Arrays.asList(row(1, "v1"));
-        writeRows(tablePath, rows, true);
+        writeRows(tablePath, rows, append);
         assertReplicaStatus(tableBucket, expectedLogEndOffset);
         return rows;
     }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index fdf5b02c5..2a216ddea 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -24,7 +24,6 @@ import org.apache.fluss.client.table.Table;
 import org.apache.fluss.client.table.writer.AppendWriter;
 import org.apache.fluss.client.table.writer.TableWriter;
 import org.apache.fluss.client.table.writer.UpsertWriter;
-import org.apache.fluss.config.AutoPartitionTimeUnit;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.FlussRuntimeException;
@@ -38,8 +37,6 @@ import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.server.replica.Replica;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.server.zk.ZooKeeperClient;
-import org.apache.fluss.types.DataTypes;
-import org.apache.fluss.utils.DateTimeUtils;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.core.execution.JobClient;
@@ -67,10 +64,6 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.file.Files;
 import java.time.Duration;
-import java.time.LocalDate;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -187,85 +180,36 @@ public class FlinkIcebergTieringTestBase {
         return catalog;
     }
 
-    protected long createPkTable(TablePath tablePath) throws Exception {
-        return createPkTable(tablePath, false);
-    }
-
-    protected long createPkTable(TablePath tablePath, boolean 
enableAutoCompaction)
-            throws Exception {
-        return createPkTable(tablePath, 1, enableAutoCompaction);
-    }
-
-    protected long createLogTable(TablePath tablePath) throws Exception {
-        return createLogTable(tablePath, false);
-    }
-
-    protected long createLogTable(TablePath tablePath, boolean 
enableAutoCompaction)
-            throws Exception {
-        return createLogTable(tablePath, 1, false, enableAutoCompaction);
-    }
-
-    protected long createLogTable(
-            TablePath tablePath, int bucketNum, boolean isPartitioned, boolean 
enableAutoCompaction)
+    protected long createPkTable(
+            TablePath tablePath, int bucketNum, boolean enableAutoCompaction, 
Schema schema)
             throws Exception {
-        Schema.Builder schemaBuilder =
-                Schema.newBuilder().column("a", DataTypes.INT()).column("b", 
DataTypes.STRING());
-
-        TableDescriptor.Builder tableBuilder =
+        TableDescriptor.Builder pkTableBuilder =
                 TableDescriptor.builder()
-                        .distributedBy(bucketNum, "a")
+                        .schema(schema)
+                        .distributedBy(bucketNum)
                         .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
                         .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
 
-        if (isPartitioned) {
-            schemaBuilder.column("c", DataTypes.STRING());
-            tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, 
true);
-            tableBuilder.partitionedBy("c");
-            tableBuilder.property(
-                    ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, 
AutoPartitionTimeUnit.YEAR);
-        }
         if (enableAutoCompaction) {
-            
tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), 
"true");
+            
pkTableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), 
"true");
         }
-        tableBuilder.schema(schemaBuilder.build());
-        return createTable(tablePath, tableBuilder.build());
+        return createTable(tablePath, pkTableBuilder.build());
     }
 
-    protected long createPkTable(TablePath tablePath, int bucketNum, boolean 
enableAutoCompaction)
+    protected long createLogTable(
+            TablePath tablePath, int bucketNum, boolean enableAutoCompaction, 
Schema schema)
             throws Exception {
-        TableDescriptor.Builder pkTableBuilder =
+        TableDescriptor.Builder tableBuilder =
                 TableDescriptor.builder()
-                        .schema(
-                                Schema.newBuilder()
-                                        .column("f_boolean", 
DataTypes.BOOLEAN())
-                                        .column("f_byte", DataTypes.TINYINT())
-                                        .column("f_short", 
DataTypes.SMALLINT())
-                                        .column("f_int", DataTypes.INT())
-                                        .column("f_long", DataTypes.BIGINT())
-                                        .column("f_float", DataTypes.FLOAT())
-                                        .column("f_double", DataTypes.DOUBLE())
-                                        .column("f_string", DataTypes.STRING())
-                                        .column("f_decimal1", 
DataTypes.DECIMAL(5, 2))
-                                        .column("f_decimal2", 
DataTypes.DECIMAL(20, 0))
-                                        .column("f_timestamp_ltz1", 
DataTypes.TIMESTAMP_LTZ(3))
-                                        .column("f_timestamp_ltz2", 
DataTypes.TIMESTAMP_LTZ(6))
-                                        .column("f_timestamp_ntz1", 
DataTypes.TIMESTAMP(3))
-                                        .column("f_timestamp_ntz2", 
DataTypes.TIMESTAMP(6))
-                                        .column("f_binary", 
DataTypes.BINARY(4))
-                                        .column("f_date", DataTypes.DATE())
-                                        .column("f_time", DataTypes.TIME())
-                                        .column("f_char", DataTypes.CHAR(3))
-                                        .column("f_bytes", DataTypes.BYTES())
-                                        .primaryKey("f_int")
-                                        .build())
-                        .distributedBy(bucketNum)
+                        .distributedBy(bucketNum, "f_int")
                         .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
                         .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
 
         if (enableAutoCompaction) {
-            
pkTableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), 
"true");
+            
tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), 
"true");
         }
-        return createTable(tablePath, pkTableBuilder.build());
+        tableBuilder.schema(schema);
+        return createTable(tablePath, tableBuilder.build());
     }
 
     protected long createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
@@ -342,45 +286,14 @@ public class FlinkIcebergTieringTestBase {
         }
     }
 
-    protected void checkDataInIcebergPrimaryKeyTable(
-            TablePath tablePath, List<InternalRow> expectedRows) throws 
Exception {
+    protected List<Record> getIcebergRecords(TablePath tablePath) throws 
IOException {
+        List<Record> icebergRecords = new ArrayList<>();
         try (CloseableIterator<Record> records = getIcebergRows(tablePath)) {
-            for (InternalRow row : expectedRows) {
-                Record record = records.next();
-                assertThat(record.get(0)).isEqualTo(row.getBoolean(0));
-                assertThat(record.get(1)).isEqualTo((int) row.getByte(1));
-                assertThat(record.get(2)).isEqualTo((int) row.getShort(2));
-                assertThat(record.get(3)).isEqualTo(row.getInt(3));
-                assertThat(record.get(4)).isEqualTo(row.getLong(4));
-                assertThat(record.get(5)).isEqualTo(row.getFloat(5));
-                assertThat(record.get(6)).isEqualTo(row.getDouble(6));
-                
assertThat(record.get(7)).isEqualTo(row.getString(7).toString());
-                // Iceberg expects BigDecimal for decimal types.
-                assertThat(record.get(8)).isEqualTo(row.getDecimal(8, 5, 
2).toBigDecimal());
-                assertThat(record.get(9)).isEqualTo(row.getDecimal(9, 20, 
0).toBigDecimal());
-                assertThat(record.get(10))
-                        .isEqualTo(
-                                OffsetDateTime.ofInstant(
-                                        row.getTimestampLtz(10, 
3).toInstant(), ZoneOffset.UTC));
-                assertThat(record.get(11))
-                        .isEqualTo(
-                                OffsetDateTime.ofInstant(
-                                        row.getTimestampLtz(11, 
6).toInstant(), ZoneOffset.UTC));
-                assertThat(record.get(12)).isEqualTo(row.getTimestampNtz(12, 
6).toLocalDateTime());
-                assertThat(record.get(13)).isEqualTo(row.getTimestampNtz(13, 
6).toLocalDateTime());
-                // Iceberg's Record interface expects ByteBuffer for binary 
types.
-                
assertThat(record.get(14)).isEqualTo(ByteBuffer.wrap(row.getBinary(14, 4)));
-                assertThat(record.get(15))
-                        .isEqualTo(DateTimeUtils.toLocalDate(row.getInt(15)))
-                        .isEqualTo(LocalDate.of(2023, 10, 25));
-                assertThat(record.get(16))
-                        .isEqualTo(DateTimeUtils.toLocalTime(row.getInt(16)))
-                        .isEqualTo(LocalTime.of(9, 30, 0, 0));
-                assertThat(record.get(17)).isEqualTo(row.getChar(17, 
3).toString());
-                
assertThat(record.get(18)).isEqualTo(ByteBuffer.wrap(row.getBytes(18)));
+            while (records.hasNext()) {
+                icebergRecords.add(records.next());
             }
-            assertThat(records.hasNext()).isFalse();
         }
+        return icebergRecords;
     }
 
     protected void checkDataInIcebergAppendOnlyTable(
@@ -400,12 +313,18 @@ public class FlinkIcebergTieringTestBase {
         }
     }
 
-    protected void checkFileCountInIcebergTable(TablePath tablePath, int 
expectedFileCount)
+    protected void checkFileStatusInIcebergTable(
+            TablePath tablePath, int expectedFileCount, boolean 
shouldDeleteFileExist)
             throws IOException {
         org.apache.iceberg.Table table = 
icebergCatalog.loadTable(toIceberg(tablePath));
         int count = 0;
         try (CloseableIterable<FileScanTask> tasks = 
table.newScan().planFiles()) {
             for (FileScanTask ignored : tasks) {
+                if (shouldDeleteFileExist) {
+                    assertThat(ignored.deletes()).isNotEmpty();
+                } else {
+                    assertThat(ignored.deletes()).isEmpty();
+                }
                 count++;
             }
         }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
index 9561c3a71..a8dbe76e3 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
@@ -30,24 +30,33 @@ import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.utils.DateTimeUtils;
 import org.apache.fluss.utils.TypeUtils;
 import org.apache.fluss.utils.types.Tuple2;
 
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.iceberg.data.Record;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
+import java.nio.ByteBuffer;
 import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
 import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** The ITCase for tiering into iceberg. */
 class IcebergTieringITCase extends FlinkIcebergTieringTestBase {
@@ -56,6 +65,36 @@ class IcebergTieringITCase extends 
FlinkIcebergTieringTestBase {
 
     private static StreamExecutionEnvironment execEnv;
 
+    private static final Schema pkSchema =
+            Schema.newBuilder()
+                    .column("f_boolean", DataTypes.BOOLEAN())
+                    .column("f_byte", DataTypes.TINYINT())
+                    .column("f_short", DataTypes.SMALLINT())
+                    .column("f_int", DataTypes.INT())
+                    .column("f_long", DataTypes.BIGINT())
+                    .column("f_float", DataTypes.FLOAT())
+                    .column("f_double", DataTypes.DOUBLE())
+                    .column("f_string", DataTypes.STRING())
+                    .column("f_decimal1", DataTypes.DECIMAL(5, 2))
+                    .column("f_decimal2", DataTypes.DECIMAL(20, 0))
+                    .column("f_timestamp_ltz1", DataTypes.TIMESTAMP_LTZ(3))
+                    .column("f_timestamp_ltz2", DataTypes.TIMESTAMP_LTZ(6))
+                    .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3))
+                    .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6))
+                    .column("f_binary", DataTypes.BINARY(4))
+                    .column("f_date", DataTypes.DATE())
+                    .column("f_time", DataTypes.TIME())
+                    .column("f_char", DataTypes.CHAR(3))
+                    .column("f_bytes", DataTypes.BYTES())
+                    .primaryKey("f_int")
+                    .build();
+
+    private static final Schema logSchema =
+            Schema.newBuilder()
+                    .column("f_int", DataTypes.INT())
+                    .column("f_str", DataTypes.STRING())
+                    .build();
+
     @BeforeAll
     protected static void beforeAll() {
         FlinkIcebergTieringTestBase.beforeAll();
@@ -68,7 +107,7 @@ class IcebergTieringITCase extends 
FlinkIcebergTieringTestBase {
     void testTiering() throws Exception {
         // create a pk table, write some records and wait until snapshot 
finished
         TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable");
-        long t1Id = createPkTable(t1);
+        long t1Id = createPkTable(t1, 1, false, pkSchema);
         TableBucket t1Bucket = new TableBucket(t1Id, 0);
         // write records
         List<InternalRow> rows =
@@ -239,6 +278,49 @@ class IcebergTieringITCase extends 
FlinkIcebergTieringTestBase {
         }
     }
 
+    private void checkDataInIcebergPrimaryKeyTable(
+            TablePath tablePath, List<InternalRow> expectedRows) throws 
Exception {
+        Iterator<Record> acturalIterator = 
getIcebergRecords(tablePath).iterator();
+        Iterator<InternalRow> iterator = expectedRows.iterator();
+        while (iterator.hasNext() && acturalIterator.hasNext()) {
+            InternalRow row = iterator.next();
+            Record record = acturalIterator.next();
+            assertThat(record.get(0)).isEqualTo(row.getBoolean(0));
+            assertThat(record.get(1)).isEqualTo((int) row.getByte(1));
+            assertThat(record.get(2)).isEqualTo((int) row.getShort(2));
+            assertThat(record.get(3)).isEqualTo(row.getInt(3));
+            assertThat(record.get(4)).isEqualTo(row.getLong(4));
+            assertThat(record.get(5)).isEqualTo(row.getFloat(5));
+            assertThat(record.get(6)).isEqualTo(row.getDouble(6));
+            assertThat(record.get(7)).isEqualTo(row.getString(7).toString());
+            // Iceberg expects BigDecimal for decimal types.
+            assertThat(record.get(8)).isEqualTo(row.getDecimal(8, 5, 
2).toBigDecimal());
+            assertThat(record.get(9)).isEqualTo(row.getDecimal(9, 20, 
0).toBigDecimal());
+            assertThat(record.get(10))
+                    .isEqualTo(
+                            OffsetDateTime.ofInstant(
+                                    row.getTimestampLtz(10, 3).toInstant(), 
ZoneOffset.UTC));
+            assertThat(record.get(11))
+                    .isEqualTo(
+                            OffsetDateTime.ofInstant(
+                                    row.getTimestampLtz(11, 6).toInstant(), 
ZoneOffset.UTC));
+            assertThat(record.get(12)).isEqualTo(row.getTimestampNtz(12, 
6).toLocalDateTime());
+            assertThat(record.get(13)).isEqualTo(row.getTimestampNtz(13, 
6).toLocalDateTime());
+            // Iceberg's Record interface expects ByteBuffer for binary types.
+            
assertThat(record.get(14)).isEqualTo(ByteBuffer.wrap(row.getBinary(14, 4)));
+            assertThat(record.get(15))
+                    .isEqualTo(DateTimeUtils.toLocalDate(row.getInt(15)))
+                    .isEqualTo(LocalDate.of(2023, 10, 25));
+            assertThat(record.get(16))
+                    .isEqualTo(DateTimeUtils.toLocalTime(row.getInt(16)))
+                    .isEqualTo(LocalTime.of(9, 30, 0, 0));
+            assertThat(record.get(17)).isEqualTo(row.getChar(17, 
3).toString());
+            
assertThat(record.get(18)).isEqualTo(ByteBuffer.wrap(row.getBytes(18)));
+        }
+        assertThat(acturalIterator.hasNext()).isFalse();
+        assertThat(iterator.hasNext()).isFalse();
+    }
+
     private Tuple2<Long, TableDescriptor> createPartitionedTable(TablePath 
partitionedTablePath)
             throws Exception {
         TableDescriptor partitionedTableDescriptor =
@@ -265,7 +347,7 @@ class IcebergTieringITCase extends 
FlinkIcebergTieringTestBase {
     private void testLogTableTiering() throws Exception {
         // then, create another log table
         TablePath t2 = TablePath.of(DEFAULT_DB, "logTable");
-        long t2Id = createLogTable(t2);
+        long t2Id = createLogTable(t2, 1, false, logSchema);
         TableBucket t2Bucket = new TableBucket(t2Id, 0);
         List<InternalRow> flussRows = new ArrayList<>();
         List<InternalRow> rows;
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
index 207714305..d44125857 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
@@ -79,7 +79,9 @@ class IcebergWriteResultSerializerTest {
         // with rewrite result
         RewriteDataFileResult rewriteDataFileResult =
                 new RewriteDataFileResult(
-                        Collections.singletonList(dataFile), 
Collections.singletonList(dataFile));
+                        1L,
+                        Collections.singletonList(dataFile),
+                        Collections.singletonList(dataFile));
         originalResult = new IcebergWriteResult(writeResult, 
rewriteDataFileResult);
         serializedData = serializer.serialize(originalResult);
         deserializedResult = serializer.deserialize(serializer.getVersion(), 
serializedData);


Reply via email to