This is an automated email from the ASF dual-hosted git repository.

mehulbatra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new ae7387e38 [lake/iceberg] Add iceberg it case (#1572)
ae7387e38 is described below

commit ae7387e387168fe97fd2cfb0a493aeb60bfdb5ca
Author: yuxia Luo <luoyu...@alumni.sjtu.edu.cn>
AuthorDate: Fri Aug 22 17:31:15 2025 +0800

    [lake/iceberg] Add iceberg it case (#1572)
    
    * add iceberg it case
    
    * address comments
---
 fluss-lake/fluss-lake-iceberg/pom.xml              |  83 +++
 .../testutils/FlinkIcebergTieringTestBase.java}    | 554 ++++++++++-----------
 .../lake/iceberg/tiering/IcebergTieringITCase.java | 205 ++++++++
 .../iceberg/{ => tiering}/IcebergTieringTest.java  |  21 +-
 .../testutils/FlinkPaimonTieringTestBase.java      |  14 -
 5 files changed, 568 insertions(+), 309 deletions(-)

diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml 
b/fluss-lake/fluss-lake-iceberg/pom.xml
index 274de4919..e0443c49b 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -68,6 +68,8 @@
             <artifactId>iceberg-bundled-guava</artifactId>
             <version>${iceberg.version}</version>
         </dependency>
+
+        <!-- test dependency -->
         <dependency>
             <groupId>com.alibaba.fluss</groupId>
             <artifactId>fluss-common</artifactId>
@@ -80,6 +82,33 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba.fluss</groupId>
+            <artifactId>fluss-server</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba.fluss</groupId>
+            <artifactId>fluss-server</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba.fluss</groupId>
+            <artifactId>fluss-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba.fluss</groupId>
+            <artifactId>fluss-test-utils</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-core</artifactId>
@@ -130,6 +159,22 @@
                     <artifactId>commons-io</artifactId>
                     <groupId>commons-io</groupId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.curator</groupId>
+                    <artifactId>curator-client</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.curator</groupId>
+                    <artifactId>curator-framework</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.curator</groupId>
+                    <artifactId>curator-recipes</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
@@ -140,6 +185,44 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.alibaba.fluss</groupId>
+            <artifactId>fluss-flink-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-test-utils</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+
     </dependencies>
 
     <build>
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
similarity index 53%
copy from 
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
copy to 
fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index 8abb04d35..b4abe4ab5 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,7 +16,7 @@
  * limitations under the License.
  */
 
-package com.alibaba.fluss.lake.paimon.testutils;
+package com.alibaba.fluss.lake.iceberg.testutils;
 
 import com.alibaba.fluss.client.Connection;
 import com.alibaba.fluss.client.ConnectionFactory;
@@ -43,40 +44,48 @@ import com.alibaba.fluss.types.DataTypes;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.CloseableIterator;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.parquet.Parquet;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.file.Files;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 import static 
com.alibaba.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
+import static 
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
 import static com.alibaba.fluss.testutils.DataTestUtils.row;
 import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
-import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitUntil;
 import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue;
+import static org.apache.iceberg.expressions.Expressions.equal;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test base for sync to paimon by Flink. */
-public class FlinkPaimonTieringTestBase {
-    protected static final String DEFAULT_DB = "fluss";
+/** Test base for tiering to Iceberg by Flink. */
+public class FlinkIcebergTieringTestBase {
 
     @RegisterExtension
     public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
@@ -85,31 +94,31 @@ public class FlinkPaimonTieringTestBase {
                     .setNumOfTabletServers(3)
                     .build();
 
-    protected static final String CATALOG_NAME = "testcatalog";
     protected StreamExecutionEnvironment execEnv;
 
     protected static Connection conn;
     protected static Admin admin;
     protected static Configuration clientConf;
     protected static String warehousePath;
-    protected static Catalog paimonCatalog;
+    protected static Catalog icebergCatalog;
 
     private static Configuration initConfig() {
         Configuration conf = new Configuration();
         conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
-                // not to clean snapshots for test purpose
                 .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 
Integer.MAX_VALUE);
-        conf.setString("datalake.format", "paimon");
-        conf.setString("datalake.paimon.metastore", "filesystem");
+
+        // Configure the tiering sink to be Iceberg
+        conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.ICEBERG);
+        conf.setString("datalake.iceberg.type", "hadoop");
         try {
             warehousePath =
-                    Files.createTempDirectory("fluss-testing-datalake-tiered")
+                    Files.createTempDirectory("fluss-testing-iceberg-tiered")
                             .resolve("warehouse")
                             .toString();
         } catch (Exception e) {
-            throw new FlussRuntimeException("Failed to create warehouse path");
+            throw new FlussRuntimeException("Failed to create Iceberg 
warehouse path", e);
         }
-        conf.setString("datalake.paimon.warehouse", warehousePath);
+        conf.setString("datalake.iceberg.warehouse", warehousePath);
         return conf;
     }
 
@@ -118,33 +127,7 @@ public class FlinkPaimonTieringTestBase {
         clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
         conn = ConnectionFactory.createConnection(clientConf);
         admin = conn.getAdmin();
-        paimonCatalog = getPaimonCatalog();
-    }
-
-    @BeforeEach
-    public void beforeEach() {
-        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-        execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-        execEnv.setParallelism(2);
-        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-    }
-
-    protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) 
throws Exception {
-        Configuration flussConfig = new Configuration(clientConf);
-        flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L));
-        return LakeTieringJobBuilder.newBuilder(
-                        execEnv,
-                        flussConfig,
-                        Configuration.fromMap(getPaimonCatalogConf()),
-                        DataLakeFormat.PAIMON.toString())
-                .build();
-    }
-
-    protected static Map<String, String> getPaimonCatalogConf() {
-        Map<String, String> paimonConf = new HashMap<>();
-        paimonConf.put("metastore", "filesystem");
-        paimonConf.put("warehouse", warehousePath);
-        return paimonConf;
+        icebergCatalog = getIcebergCatalog();
     }
 
     @AfterAll
@@ -157,106 +140,48 @@ public class FlinkPaimonTieringTestBase {
             conn.close();
             conn = null;
         }
-    }
-
-    protected long createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
-            throws Exception {
-        admin.createTable(tablePath, tableDescriptor, true).get();
-        return admin.getTableInfo(tablePath).get().getTableId();
-    }
-
-    protected void waitUntilSnapshot(long tableId, int bucketNum, long 
snapshotId) {
-        for (int i = 0; i < bucketNum; i++) {
-            TableBucket tableBucket = new TableBucket(tableId, i);
-            FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, 
snapshotId);
-        }
-    }
-
-    protected void writeRows(TablePath tablePath, List<InternalRow> rows, 
boolean append)
-            throws Exception {
-        try (Table table = conn.getTable(tablePath)) {
-            TableWriter tableWriter;
-            if (append) {
-                tableWriter = table.newAppend().createWriter();
-            } else {
-                tableWriter = table.newUpsert().createWriter();
-            }
-            for (InternalRow row : rows) {
-                if (tableWriter instanceof AppendWriter) {
-                    ((AppendWriter) tableWriter).append(row);
-                } else {
-                    ((UpsertWriter) tableWriter).upsert(row);
-                }
-            }
-            tableWriter.flush();
+        if (icebergCatalog instanceof Closeable) {
+            ((Closeable) icebergCatalog).close();
+            icebergCatalog = null;
         }
     }
 
-    protected Map<String, List<InternalRow>> writeRowsIntoPartitionedTable(
-            TablePath tablePath,
-            TableDescriptor tableDescriptor,
-            Map<Long, String> partitionNameByIds)
-            throws Exception {
-        List<InternalRow> rows = new ArrayList<>();
-        Map<String, List<InternalRow>> writtenRowsByPartition = new 
HashMap<>();
-        for (String partitionName : partitionNameByIds.values()) {
-            List<InternalRow> partitionRows =
-                    Arrays.asList(
-                            row(11, "v1", partitionName),
-                            row(12, "v2", partitionName),
-                            row(13, "v3", partitionName));
-            rows.addAll(partitionRows);
-            writtenRowsByPartition.put(partitionName, partitionRows);
-        }
-
-        writeRows(tablePath, rows, !tableDescriptor.hasPrimaryKey());
-        return writtenRowsByPartition;
+    @BeforeEach
+    public void beforeEach() {
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        execEnv.setParallelism(2);
     }
 
-    /**
-     * Wait until the default number of partitions is created. Return the map 
from partition id to
-     * partition name. .
-     */
-    public static Map<Long, String> waitUntilPartitions(
-            ZooKeeperClient zooKeeperClient, TablePath tablePath) {
-        return waitUntilPartitions(
-                zooKeeperClient,
-                tablePath,
-                
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
+    protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) 
throws Exception {
+        Configuration flussConfig = new Configuration(clientConf);
+        flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L));
+        return LakeTieringJobBuilder.newBuilder(
+                        execEnv,
+                        flussConfig,
+                        Configuration.fromMap(getIcebergCatalogConf()),
+                        DataLakeFormat.ICEBERG.toString())
+                .build();
     }
 
-    public static Map<Long, String> waitUntilPartitions(TablePath tablePath) {
-        return waitUntilPartitions(
-                FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
-                tablePath,
-                
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
+    protected static Map<String, String> getIcebergCatalogConf() {
+        Map<String, String> icebergConf = new HashMap<>();
+        icebergConf.put("type", "hadoop");
+        icebergConf.put("warehouse", warehousePath);
+        return icebergConf;
     }
 
-    /**
-     * Wait until the given number of partitions is created. Return the map 
from partition id to
-     * partition name.
-     */
-    public static Map<Long, String> waitUntilPartitions(
-            ZooKeeperClient zooKeeperClient, TablePath tablePath, int 
expectPartitions) {
-        return waitValue(
-                () -> {
-                    Map<Long, String> gotPartitions =
-                            zooKeeperClient.getPartitionIdAndNames(tablePath);
-                    return expectPartitions == gotPartitions.size()
-                            ? Optional.of(gotPartitions)
-                            : Optional.empty();
-                },
-                Duration.ofMinutes(1),
-                String.format("expect %d table partition has not been 
created", expectPartitions));
-    }
-
-    protected static Catalog getPaimonCatalog() {
-        Map<String, String> catalogOptions = getPaimonCatalogConf();
-        return 
CatalogFactory.createCatalog(CatalogContext.create(Options.fromMap(catalogOptions)));
+    protected static Catalog getIcebergCatalog() {
+        HadoopCatalog catalog = new HadoopCatalog();
+        catalog.setConf(new org.apache.hadoop.conf.Configuration());
+        Map<String, String> properties = new HashMap<>();
+        properties.put("warehouse", warehousePath);
+        catalog.initialize("hadoop", properties);
+        return catalog;
     }
 
-    protected Replica getLeaderReplica(TableBucket tableBucket) {
-        return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+    protected long createPkTable(TablePath tablePath) throws Exception {
+        return createPkTable(tablePath, 1);
     }
 
     protected long createLogTable(TablePath tablePath) throws Exception {
@@ -289,61 +214,6 @@ public class FlinkPaimonTieringTestBase {
         return createTable(tablePath, tableBuilder.build());
     }
 
-    protected long createFullTypeLogTable(TablePath tablePath, int bucketNum, 
boolean isPartitioned)
-            throws Exception {
-        Schema.Builder schemaBuilder =
-                Schema.newBuilder()
-                        .column("f_boolean", DataTypes.BOOLEAN())
-                        .column("f_byte", DataTypes.TINYINT())
-                        .column("f_short", DataTypes.SMALLINT())
-                        .column("f_int", DataTypes.INT())
-                        .column("f_long", DataTypes.BIGINT())
-                        .column("f_float", DataTypes.FLOAT())
-                        .column("f_double", DataTypes.DOUBLE())
-                        .column("f_string", DataTypes.STRING())
-                        .column("f_decimal1", DataTypes.DECIMAL(5, 2))
-                        .column("f_decimal2", DataTypes.DECIMAL(20, 0))
-                        .column("f_timestamp_ltz1", DataTypes.TIMESTAMP_LTZ(3))
-                        .column("f_timestamp_ltz2", DataTypes.TIMESTAMP_LTZ(6))
-                        .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3))
-                        .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6))
-                        .column("f_binary", DataTypes.BINARY(4));
-
-        TableDescriptor.Builder tableBuilder =
-                TableDescriptor.builder()
-                        .distributedBy(bucketNum, "f_int")
-                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
-                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
-
-        if (isPartitioned) {
-            schemaBuilder.column("p", DataTypes.STRING());
-            tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, 
true);
-            tableBuilder.partitionedBy("p");
-            tableBuilder.property(
-                    ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, 
AutoPartitionTimeUnit.YEAR);
-        }
-        tableBuilder.schema(schemaBuilder.build());
-        return createTable(tablePath, tableBuilder.build());
-    }
-
-    protected long createPrimaryKeyTable(
-            TablePath tablePath, int bucketNum, List<Schema.Column> columns) 
throws Exception {
-        Schema.Builder schemaBuilder =
-                
Schema.newBuilder().fromColumns(columns).primaryKey(columns.get(0).getName());
-
-        TableDescriptor.Builder tableBuilder =
-                TableDescriptor.builder()
-                        .distributedBy(bucketNum)
-                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
-                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
-        tableBuilder.schema(schemaBuilder.build());
-        return createTable(tablePath, tableBuilder.build());
-    }
-
-    protected long createPkTable(TablePath tablePath) throws Exception {
-        return createPkTable(tablePath, 1);
-    }
-
     protected long createPkTable(TablePath tablePath, int bucketNum) throws 
Exception {
         TableDescriptor table1Descriptor =
                 TableDescriptor.builder()
@@ -360,41 +230,10 @@ public class FlinkPaimonTieringTestBase {
         return createTable(tablePath, table1Descriptor);
     }
 
-    protected void dropTable(TablePath tablePath) throws Exception {
-        admin.dropTable(tablePath, false).get();
-        Identifier tableIdentifier = toPaimonIdentifier(tablePath);
-        try {
-            paimonCatalog.dropTable(tableIdentifier, false);
-        } catch (Catalog.TableNotExistException e) {
-            // do nothing, table not exists
-        }
-    }
-
-    private Identifier toPaimonIdentifier(TablePath tablePath) {
-        return Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName());
-    }
-
-    protected void assertReplicaStatus(
-            TablePath tablePath,
-            long tableId,
-            int bucketCount,
-            boolean isPartitioned,
-            Map<TableBucket, Long> expectedLogEndOffset) {
-        if (isPartitioned) {
-            Map<Long, String> partitionById =
-                    
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath);
-            for (Long partitionId : partitionById.keySet()) {
-                for (int i = 0; i < bucketCount; i++) {
-                    TableBucket tableBucket = new TableBucket(tableId, 
partitionId, i);
-                    assertReplicaStatus(tableBucket, 
expectedLogEndOffset.get(tableBucket));
-                }
-            }
-        } else {
-            for (int i = 0; i < bucketCount; i++) {
-                TableBucket tableBucket = new TableBucket(tableId, i);
-                assertReplicaStatus(tableBucket, 
expectedLogEndOffset.get(tableBucket));
-            }
-        }
+    protected long createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
+            throws Exception {
+        admin.createTable(tablePath, tableDescriptor, true).get();
+        return admin.getTableInfo(tablePath).get().getTableId();
     }
 
     protected void assertReplicaStatus(TableBucket tb, long 
expectedLogEndOffset) {
@@ -409,70 +248,219 @@ public class FlinkPaimonTieringTestBase {
                 });
     }
 
-    protected void waitUntilBucketSynced(
-            TablePath tablePath, long tableId, int bucketCount, boolean 
isPartition) {
-        if (isPartition) {
-            Map<Long, String> partitionById = waitUntilPartitions(tablePath);
-            for (Long partitionId : partitionById.keySet()) {
-                for (int i = 0; i < bucketCount; i++) {
-                    TableBucket tableBucket = new TableBucket(tableId, 
partitionId, i);
-                    waitUntilBucketSynced(tableBucket);
-                }
+    public static Map<Long, String> waitUntilPartitions(TablePath tablePath) {
+        return waitUntilPartitions(
+                FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
+                tablePath,
+                
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
+    }
+
+    /**
+     * Wait until the given number of partitions is created. Return the map 
from partition id to
+     * partition name.
+     */
+    public static Map<Long, String> waitUntilPartitions(
+            ZooKeeperClient zooKeeperClient, TablePath tablePath, int 
expectPartitions) {
+        return waitValue(
+                () -> {
+                    Map<Long, String> gotPartitions =
+                            zooKeeperClient.getPartitionIdAndNames(tablePath);
+                    return expectPartitions == gotPartitions.size()
+                            ? Optional.of(gotPartitions)
+                            : Optional.empty();
+                },
+                Duration.ofMinutes(1),
+                String.format("expect %d table partition has not been 
created", expectPartitions));
+    }
+
+    protected Replica getLeaderReplica(TableBucket tableBucket) {
+        return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+    }
+
+    protected void writeRows(TablePath tablePath, List<InternalRow> rows, 
boolean append)
+            throws Exception {
+        try (Table table = conn.getTable(tablePath)) {
+            TableWriter tableWriter;
+            if (append) {
+                tableWriter = table.newAppend().createWriter();
+            } else {
+                tableWriter = table.newUpsert().createWriter();
             }
-        } else {
-            for (int i = 0; i < bucketCount; i++) {
-                TableBucket tableBucket = new TableBucket(tableId, i);
-                waitUntilBucketSynced(tableBucket);
+            for (InternalRow row : rows) {
+                if (tableWriter instanceof AppendWriter) {
+                    ((AppendWriter) tableWriter).append(row);
+                } else {
+                    ((UpsertWriter) tableWriter).upsert(row);
+                }
             }
+            tableWriter.flush();
         }
     }
 
-    protected void waitUntilBucketSynced(TableBucket tb) {
-        waitUntil(
-                () -> {
-                    Replica replica = getLeaderReplica(tb);
-                    return replica.getLogTablet().getLakeTableSnapshotId() >= 
0;
-                },
-                Duration.ofMinutes(2),
-                "bucket " + tb + "not synced");
+    protected void waitUntilSnapshot(long tableId, int bucketNum, long 
snapshotId) {
+        for (int i = 0; i < bucketNum; i++) {
+            TableBucket tableBucket = new TableBucket(tableId, i);
+            FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, 
snapshotId);
+        }
     }
 
-    protected void checkDataInPaimonPrimayKeyTable(
+    protected void checkDataInIcebergPrimaryKeyTable(
             TablePath tablePath, List<InternalRow> expectedRows) throws 
Exception {
-        Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
-                getPaimonRowCloseableIterator(tablePath);
-        for (InternalRow expectedRow : expectedRows) {
-            org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
-            assertThat(row.getInt(0)).isEqualTo(expectedRow.getInt(0));
-            
assertThat(row.getString(1).toString()).isEqualTo(expectedRow.getString(1).toString());
+        try (CloseableIterator<Record> records = getIcebergRows(tablePath)) {
+            for (InternalRow row : expectedRows) {
+                Record record = records.next();
+                assertThat(record.get(0)).isEqualTo(row.getInt(0));
+                
assertThat(record.get(1)).isEqualTo(row.getString(1).toString());
+            }
+            assertThat(records.hasNext()).isFalse();
         }
     }
 
-    protected CloseableIterator<org.apache.paimon.data.InternalRow> 
getPaimonRowCloseableIterator(
-            TablePath tablePath) throws Exception {
-        Identifier tableIdentifier =
-                Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName());
+    protected void checkDataInIcebergAppendOnlyTable(
+            TablePath tablePath, List<InternalRow> expectedRows, long 
startingOffset)
+            throws Exception {
+        try (CloseableIterator<Record> records = getIcebergRows(tablePath)) {
+            Iterator<InternalRow> flussRowIterator = expectedRows.iterator();
+            while (records.hasNext()) {
+                Record actualRecord = records.next();
+                InternalRow flussRow = flussRowIterator.next();
+                assertThat(actualRecord.get(0)).isEqualTo(flussRow.getInt(0));
+                
assertThat(actualRecord.get(1)).isEqualTo(flussRow.getString(1).toString());
+                // the idx 2 is __bucket, so use 3
+                assertThat(actualRecord.get(3)).isEqualTo(startingOffset++);
+            }
+            assertThat(flussRowIterator.hasNext()).isFalse();
+        }
+    }
 
-        paimonCatalog = getPaimonCatalog();
+    protected void checkDataInIcebergAppendOnlyPartitionedTable(
+            TablePath tablePath,
+            Map<String, String> partitionSpec,
+            List<InternalRow> expectedRows,
+            long startingOffset)
+            throws Exception {
+        try (CloseableIterator<Record> records = getIcebergRows(tablePath, 
partitionSpec)) {
+            Iterator<InternalRow> flussRowIterator = expectedRows.iterator();
+            while (records.hasNext()) {
+                Record actualRecord = records.next();
+                InternalRow flussRow = flussRowIterator.next();
+                assertThat(actualRecord.get(0)).isEqualTo(flussRow.getInt(0));
+                
assertThat(actualRecord.get(1)).isEqualTo(flussRow.getString(1).toString());
+                
assertThat(actualRecord.get(2)).isEqualTo(flussRow.getString(2).toString());
+                // the idx 3 is __bucket, so use 4
+                assertThat(actualRecord.get(4)).isEqualTo(startingOffset++);
+            }
+            assertThat(flussRowIterator.hasNext()).isFalse();
+        }
+    }
 
-        FileStoreTable table = (FileStoreTable) 
paimonCatalog.getTable(tableIdentifier);
+    private CloseableIterator<Record> getIcebergRows(TablePath tablePath) {
+        return getIcebergRows(tablePath, Collections.emptyMap());
+    }
+
+    @SuppressWarnings("resource")
+    private CloseableIterator<Record> getIcebergRows(
+            TablePath tablePath, Map<String, String> partitionSpec) {
+        org.apache.iceberg.Table table = 
icebergCatalog.loadTable(toIceberg(tablePath));
+        // is primary key, we don't care about records order,
+        // use iceberg read api directly
+        if (!table.schema().identifierFieldIds().isEmpty()) {
+            IcebergGenerics.ScanBuilder scanBuilder =
+                    filterByPartition(IcebergGenerics.read(table), 
partitionSpec);
+            return scanBuilder.build().iterator();
+        } else {
+            // is log table, we want to compare __offset column
+            // so sort data files by __offset according to the column stats
+            List<Record> records = new ArrayList<>();
+            int fieldId = 
table.schema().findField(OFFSET_COLUMN_NAME).fieldId();
+            SortedSet<DataFile> files =
+                    new TreeSet<>(
+                            (f1, f2) -> {
+                                ByteBuffer buffer1 =
+                                        (ByteBuffer)
+                                                f1.lowerBounds()
+                                                        .get(fieldId)
+                                                        
.order(ByteOrder.LITTLE_ENDIAN)
+                                                        .rewind();
+                                long offset1 = buffer1.getLong();
+                                ByteBuffer buffer2 =
+                                        (ByteBuffer)
+                                                f2.lowerBounds()
+                                                        .get(fieldId)
+                                                        
.order(ByteOrder.LITTLE_ENDIAN)
+                                                        .rewind();
+                                long offset2 = buffer2.getLong();
+                                return Long.compare(offset1, offset2);
+                            });
+
+            table.refresh();
+            TableScan tableScan = filterByPartition(table.newScan(), 
partitionSpec);
+            tableScan
+                    .includeColumnStats()
+                    .planFiles()
+                    .iterator()
+                    .forEachRemaining(fileScanTask -> 
files.add(fileScanTask.file()));
+
+            for (DataFile file : files) {
+                Iterable<Record> iterable =
+                        
Parquet.read(table.io().newInputFile(file.path().toString()))
+                                .project(table.schema())
+                                .createReaderFunc(
+                                        fileSchema ->
+                                                
GenericParquetReaders.buildReader(
+                                                        table.schema(), 
fileSchema))
+                                .build();
+                iterable.forEach(records::add);
+            }
+
+            return CloseableIterator.withClose(records.iterator());
+        }
+    }
+
+    private IcebergGenerics.ScanBuilder filterByPartition(
+            IcebergGenerics.ScanBuilder scanBuilder, Map<String, String> 
partitionSpec) {
+        for (Map.Entry<String, String> partitionKeyAndValue : 
partitionSpec.entrySet()) {
+            String partitionCol = partitionKeyAndValue.getKey();
+            String partitionValue = partitionKeyAndValue.getValue();
+            scanBuilder = scanBuilder.where(equal(partitionCol, 
partitionValue));
+        }
+        return scanBuilder;
+    }
 
-        RecordReader<org.apache.paimon.data.InternalRow> reader =
-                
table.newRead().createReader(table.newReadBuilder().newScan().plan());
-        return reader.toCloseableIterator();
+    private TableScan filterByPartition(TableScan tableScan, Map<String, 
String> partitionSpec) {
+        for (Map.Entry<String, String> partitionKeyAndValue : 
partitionSpec.entrySet()) {
+            String partitionCol = partitionKeyAndValue.getKey();
+            String partitionValue = partitionKeyAndValue.getValue();
+            tableScan = tableScan.filter(equal(partitionCol, partitionValue));
+        }
+        return tableScan;
     }
 
-    protected void checkSnapshotPropertyInPaimon(
+    protected void checkSnapshotPropertyInIceberg(
             TablePath tablePath, Map<String, String> expectedProperties) 
throws Exception {
-        FileStoreTable table =
-                (FileStoreTable)
-                        getPaimonCatalog()
-                                .getTable(
-                                        Identifier.create(
-                                                tablePath.getDatabaseName(),
-                                                tablePath.getTableName()));
-        Snapshot snapshot = table.snapshotManager().latestSnapshot();
-        assertThat(snapshot).isNotNull();
-        assertThat(snapshot.properties()).isEqualTo(expectedProperties);
+        org.apache.iceberg.Table table = 
icebergCatalog.loadTable(toIceberg(tablePath));
+        Snapshot snapshot = table.currentSnapshot();
+        
assertThat(snapshot.summary()).containsAllEntriesOf(expectedProperties);
+    }
+
+    protected Map<String, List<InternalRow>> writeRowsIntoPartitionedTable(
+            TablePath tablePath,
+            TableDescriptor tableDescriptor,
+            Map<Long, String> partitionNameByIds)
+            throws Exception {
+        List<InternalRow> rows = new ArrayList<>();
+        Map<String, List<InternalRow>> writtenRowsByPartition = new 
HashMap<>();
+        for (String partitionName : partitionNameByIds.values()) {
+            List<InternalRow> partitionRows =
+                    Arrays.asList(
+                            row(11, "v1", partitionName),
+                            row(12, "v2", partitionName),
+                            row(13, "v3", partitionName));
+            rows.addAll(partitionRows);
+            writtenRowsByPartition.put(partitionName, partitionRows);
+        }
+
+        writeRows(tablePath, rows, !tableDescriptor.hasPrimaryKey());
+        return writtenRowsByPartition;
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
new file mode 100644
index 000000000..0b3e5f2dd
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.config.AutoPartitionTimeUnit;
+import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.utils.types.Tuple2;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+
+/** The ITCase for tiering into iceberg. */
+class IcebergTieringITCase extends FlinkIcebergTieringTestBase {
+
+    protected static final String DEFAULT_DB = "fluss";
+
+    private static StreamExecutionEnvironment execEnv;
+
+    @BeforeAll
+    protected static void beforeAll() {
+        FlinkIcebergTieringTestBase.beforeAll();
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        execEnv.setParallelism(2);
+        execEnv.enableCheckpointing(1000);
+    }
+
+    @Test
+    void testTiering() throws Exception {
+        // create a pk table, write some records and wait until snapshot 
finished
+        TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable");
+        long t1Id = createPkTable(t1);
+        TableBucket t1Bucket = new TableBucket(t1Id, 0);
+        // write records
+        List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
+        writeRows(t1, rows, false);
+        waitUntilSnapshot(t1Id, 1, 0);
+
+        // then start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+        try {
+            // check the status of replica after synced
+            assertReplicaStatus(t1Bucket, 3);
+
+            checkDataInIcebergPrimaryKeyTable(t1, rows);
+            // check snapshot property in iceberg
+            Map<String, String> properties =
+                    new HashMap<String, String>() {
+                        {
+                            put(
+                                    FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                    "[{\"bucket_id\":0,\"log_offset\":3}]");
+                        }
+                    };
+            checkSnapshotPropertyInIceberg(t1, properties);
+
+            // test log table
+            testLogTableTiering();
+
+            // then write data to the pk tables
+            // write records
+            rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3, 
"v333"));
+            // write records
+            writeRows(t1, rows, false);
+
+            // check the status of replica of t1 after synced
+            // not check start offset since we won't
+            // update start log offset for primary key table
+            // 3 initial + (3 deletes + 3 inserts) = 9
+            assertReplicaStatus(t1Bucket, 9);
+
+            checkDataInIcebergPrimaryKeyTable(t1, rows);
+
+            // then create partitioned table and wait partitions are ready
+            testPartitionedTableTiering();
+        } finally {
+            jobClient.cancel().get();
+        }
+    }
+
+    private Tuple2<Long, TableDescriptor> createPartitionedTable(TablePath 
partitionedTablePath)
+            throws Exception {
+        TableDescriptor partitionedTableDescriptor =
+                TableDescriptor.builder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("id", DataTypes.INT())
+                                        .column("name", DataTypes.STRING())
+                                        .column("date", DataTypes.STRING())
+                                        .build())
+                        .partitionedBy("date")
+                        .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, 
true)
+                        .property(
+                                ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
+                                AutoPartitionTimeUnit.YEAR)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500))
+                        .build();
+        return Tuple2.of(
+                createTable(partitionedTablePath, partitionedTableDescriptor),
+                partitionedTableDescriptor);
+    }
+
+    private void testLogTableTiering() throws Exception {
+        // then, create another log table
+        TablePath t2 = TablePath.of(DEFAULT_DB, "logTable");
+        long t2Id = createLogTable(t2);
+        TableBucket t2Bucket = new TableBucket(t2Id, 0);
+        List<InternalRow> flussRows = new ArrayList<>();
+        List<InternalRow> rows;
+        // write records
+        for (int i = 0; i < 10; i++) {
+            rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
+            flussRows.addAll(rows);
+            // write records
+            writeRows(t2, rows, true);
+        }
+        // check the status of replica after synced;
+        // note: we can't update log start offset for unaware bucket mode log 
table
+        assertReplicaStatus(t2Bucket, 30);
+
+        // check data in iceberg
+        checkDataInIcebergAppendOnlyTable(t2, flussRows, 0);
+    }
+
+    private void testPartitionedTableTiering() throws Exception {
+        TablePath partitionedTablePath = TablePath.of(DEFAULT_DB, 
"partitionedTable");
+        Tuple2<Long, TableDescriptor> tableIdAndDescriptor =
+                createPartitionedTable(partitionedTablePath);
+        Map<Long, String> partitionNameByIds = 
waitUntilPartitions(partitionedTablePath);
+
+        // now, write rows into partitioned table
+        TableDescriptor partitionedTableDescriptor = tableIdAndDescriptor.f1;
+        Map<String, List<InternalRow>> writtenRowsByPartition =
+                writeRowsIntoPartitionedTable(
+                        partitionedTablePath, partitionedTableDescriptor, 
partitionNameByIds);
+        long tableId = tableIdAndDescriptor.f0;
+
+        // wait until synced to iceberg
+        for (Long partitionId : partitionNameByIds.keySet()) {
+            TableBucket tableBucket = new TableBucket(tableId, partitionId, 0);
+            assertReplicaStatus(tableBucket, 3);
+        }
+
+        // now, let's check data in iceberg per partition
+        // check data in iceberg
+        String partitionCol = 
partitionedTableDescriptor.getPartitionKeys().get(0);
+        for (String partitionName : partitionNameByIds.values()) {
+            checkDataInIcebergAppendOnlyPartitionedTable(
+                    partitionedTablePath,
+                    Collections.singletonMap(partitionCol, partitionName),
+                    writtenRowsByPartition.get(partitionName),
+                    0);
+        }
+
+        Map<String, String> properties =
+                new HashMap<String, String>() {
+                    {
+                        put(
+                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                "["
+                                        + 
"{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
+                                        + 
"{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
+                                        + "]");
+                    }
+                };
+
+        checkSnapshotPropertyInIceberg(partitionedTablePath, properties);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringTest.java
similarity index 95%
rename from 
fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
rename to 
fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringTest.java
index 8a666b677..c55b834f7 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringTest.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,14 +16,10 @@
  * limitations under the License.
  */
 
-package com.alibaba.fluss.lake.iceberg;
+package com.alibaba.fluss.lake.iceberg.tiering;
 
 import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.lake.committer.LakeCommitter;
-import com.alibaba.fluss.lake.iceberg.tiering.IcebergCatalogProvider;
-import com.alibaba.fluss.lake.iceberg.tiering.IcebergCommittable;
-import com.alibaba.fluss.lake.iceberg.tiering.IcebergLakeTieringFactory;
-import com.alibaba.fluss.lake.iceberg.tiering.IcebergWriteResult;
 import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
 import com.alibaba.fluss.lake.writer.LakeWriter;
 import com.alibaba.fluss.lake.writer.WriterInitContext;
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 8abb04d35..3dcfc826d 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -326,20 +326,6 @@ public class FlinkPaimonTieringTestBase {
         return createTable(tablePath, tableBuilder.build());
     }
 
-    protected long createPrimaryKeyTable(
-            TablePath tablePath, int bucketNum, List<Schema.Column> columns) 
throws Exception {
-        Schema.Builder schemaBuilder =
-                
Schema.newBuilder().fromColumns(columns).primaryKey(columns.get(0).getName());
-
-        TableDescriptor.Builder tableBuilder =
-                TableDescriptor.builder()
-                        .distributedBy(bucketNum)
-                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
-                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
-        tableBuilder.schema(schemaBuilder.build());
-        return createTable(tablePath, tableBuilder.build());
-    }
-
     protected long createPkTable(TablePath tablePath) throws Exception {
         return createPkTable(tablePath, 1);
     }

Reply via email to