This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ce837889 [lake] Re-initial tiering splits for tables if reader 
failover happen (#1984)
2ce837889 is described below

commit 2ce837889a6de56f558f7c5ea4766783dd63304e
Author: Junbo Wang <[email protected]>
AuthorDate: Thu Jan 15 10:35:53 2026 +0800

    [lake] Re-initial tiering splits for tables if reader failover happen 
(#1984)
---
 .../fluss/flink/adapter/RuntimeContextAdapter.java |  34 ----
 .../tiering/committer/TieringCommitOperator.java   |  28 +--
 .../flink/tiering/event/TieringFailOverEvent.java  |  26 ---
 .../source/enumerator/TieringSourceEnumerator.java |  80 ++++++--
 .../fluss/flink/tiering/FlinkTieringTestBase.java  | 164 +++++++++++++++++
 .../fluss/flink/tiering/TieringFailoverITCase.java | 177 ++++++++++++++++++
 .../FlussMockSplitEnumeratorContext.java           |  85 +++++++++
 .../enumerator/TieringSourceEnumeratorTest.java    | 118 ++++++------
 .../fluss/lake/values/TestingValuesLake.java       | 202 +++++++++++++++++++++
 .../lake/values/TestingValuesLakeCatalog.java      |  43 +++++
 .../lake/values/TestingValuesLakeStorage.java      |  44 +++++
 .../values/TestingValuesLakeStoragePlugin.java     |  42 +++++
 .../values/tiering/TestingValuesLakeCommitter.java | 121 ++++++++++++
 .../tiering/TestingValuesLakeTieringFactory.java   |  60 ++++++
 .../values/tiering/TestingValuesLakeWriter.java    | 100 ++++++++++
 ...apache.fluss.lake.lakestorage.LakeStoragePlugin |  19 ++
 .../lake/iceberg/tiering/IcebergTieringITCase.java |   8 +-
 17 files changed, 1185 insertions(+), 166 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
 
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
deleted file mode 100644
index be97b7048..000000000
--- 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.adapter;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-/**
- * An adapter for Flink {@link RuntimeContext} class. The {@link 
RuntimeContext} class added the
- * `getJobInfo` and `getTaskInfo` methods in version 1.19 and deprecated many 
methods, such as
- * `getAttemptNumber`.
- *
- * <p>TODO: remove this class when no longer support flink 1.18.
- */
-public class RuntimeContextAdapter {
-
-    public static int getAttemptNumber(RuntimeContext runtimeContext) {
-        return runtimeContext.getAttemptNumber();
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index 79bcc5f85..49afd5c44 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -23,10 +23,8 @@ import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.client.metadata.LakeSnapshot;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
-import org.apache.fluss.flink.adapter.RuntimeContextAdapter;
 import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
 import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
-import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
 import org.apache.fluss.flink.tiering.source.TableBucketWriteResult;
 import org.apache.fluss.flink.tiering.source.TieringSource;
 import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
@@ -40,13 +38,10 @@ import org.apache.fluss.utils.ExceptionUtils;
 
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
-import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import javax.annotation.Nullable;
 
@@ -108,29 +103,14 @@ public class TieringCommitOperator<WriteResult, 
Committable>
         this.collectedTableBucketWriteResults = new HashMap<>();
         this.flussConfig = flussConf;
         this.lakeTieringConfig = lakeTieringConfig;
-        this.operatorEventGateway =
-                parameters
-                        .getOperatorEventDispatcher()
-                        
.getOperatorEventGateway(TieringSource.TIERING_SOURCE_OPERATOR_UID);
         this.setup(
                 parameters.getContainingTask(),
                 parameters.getStreamConfig(),
                 parameters.getOutput());
-    }
-
-    @Override
-    public void setup(
-            StreamTask<?, ?> containingTask,
-            StreamConfig config,
-            Output<StreamRecord<CommittableMessage<Committable>>> output) {
-        super.setup(containingTask, config, output);
-        int attemptNumber = 
RuntimeContextAdapter.getAttemptNumber(getRuntimeContext());
-        if (attemptNumber > 0) {
-            LOG.info("Send TieringFailoverEvent, current attempt number: {}", 
attemptNumber);
-            // attempt number is greater than zero, the job must failover
-            operatorEventGateway.sendEventToCoordinator(
-                    new SourceEventWrapper(new TieringFailOverEvent()));
-        }
+        this.operatorEventGateway =
+                parameters
+                        .getOperatorEventDispatcher()
+                        
.getOperatorEventGateway(TieringSource.TIERING_SOURCE_OPERATOR_UID);
     }
 
     @Override
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringFailOverEvent.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringFailOverEvent.java
deleted file mode 100644
index 8cbe87fd1..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringFailOverEvent.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.tiering.event;
-
-import org.apache.flink.api.connector.source.SourceEvent;
-
-/** SourceEvent used to represent tiering is failover. */
-public class TieringFailOverEvent implements SourceEvent {
-    private static final long serialVersionUID = 1L;
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
index 1ef73eb39..c1c26390a 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
@@ -26,7 +26,6 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.metrics.FlinkMetricRegistry;
 import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
 import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
-import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
 import org.apache.fluss.flink.tiering.source.split.TieringSplit;
 import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator;
 import 
org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
@@ -41,6 +40,7 @@ import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo;
 import org.apache.fluss.rpc.metrics.ClientMetricGroup;
 import org.apache.fluss.utils.MapUtils;
 
+import org.apache.flink.api.connector.source.ReaderInfo;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
@@ -60,6 +60,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static 
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator.HeartBeatHelper.basicHeartBeat;
@@ -105,6 +106,8 @@ public class TieringSourceEnumerator
     private TieringSplitGenerator splitGenerator;
     private int flussCoordinatorEpoch;
 
+    private volatile boolean isFailOvering = false;
+
     private volatile boolean closed = false;
 
     public TieringSourceEnumerator(
@@ -179,11 +182,53 @@ public class TieringSourceEnumerator
     @Override
     public void addReader(int subtaskId) {
         LOG.info("Adding reader: {} to Tiering Source enumerator.", subtaskId);
-        if (context.registeredReaders().containsKey(subtaskId)) {
+        Map<Integer, ReaderInfo> readerByAttempt =
+                context.registeredReadersOfAttempts().get(subtaskId);
+        if (readerByAttempt != null && !readerByAttempt.isEmpty()) {
             readersAwaitingSplit.add(subtaskId);
+            int maxAttempt = max(readerByAttempt.keySet());
+            if (maxAttempt >= 1) {
+                if (isFailOvering) {
+                    LOG.warn(
+                            "Subtask {} (max attempt {}) registered during 
ongoing failover.",
+                            subtaskId,
+                            maxAttempt);
+                } else {
+                    LOG.warn(
+                            "Detected failover: subtask {} has max attempt {} 
> 0. Triggering global failover handling.",
+                            subtaskId,
+                            maxAttempt);
+                    // should be failover
+                    isFailOvering = true;
+                    handleSourceReaderFailOver();
+                }
+
+                // if registered readers equal to current parallelism, check 
whether all registered
+                // readers have same max attempt
+                if (context.registeredReadersOfAttempts().size() == 
context.currentParallelism()) {
+                    // Check if all readers have the same max attempt number
+                    Set<Integer> maxAttempts =
+                            
context.registeredReadersOfAttempts().values().stream()
+                                    .map(_readerByAttempt -> 
max(_readerByAttempt.keySet()))
+                                    .collect(Collectors.toSet());
+                    int globalMaxAttempt = max(maxAttempts);
+                    if (maxAttempts.size() == 1 && globalMaxAttempt >= 1) {
+                        LOG.info(
+                                "Failover completed. All {} subtasks reached 
the same attempt number {}. Current registered readers are {}",
+                                context.currentParallelism(),
+                                globalMaxAttempt,
+                                context.registeredReadersOfAttempts());
+                        isFailOvering = false;
+                    }
+                }
+            }
         }
     }
 
+    private int max(Set<Integer> integers) {
+        return integers.stream().max(Integer::compareTo).orElse(-1);
+    }
+
     @Override
     public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
         if (sourceEvent instanceof FinishedTieringEvent) {
@@ -218,18 +263,23 @@ public class TieringSourceEnumerator
             }
         }
 
-        if (sourceEvent instanceof TieringFailOverEvent) {
-            LOG.info(
-                    "Receiving tiering failover event, mark current tiering 
table epoch {} as failed.",
-                    tieringTableEpochs);
-            // we need to make all as failed
-            failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs));
-            tieringTableEpochs.clear();
-            // also clean all pending splits since we mark all as failed
-            pendingSplits.clear();
+        if (!finishedTableEpochs.isEmpty() || !failedTableEpochs.isEmpty()) {
+            // call one round of heartbeat to notify table has been finished 
or failed
+            this.context.callAsync(
+                    this::requestTieringTableSplitsViaHeartBeat, 
this::generateAndAssignSplits);
         }
+    }
 
-        if (!finishedTableEpochs.isEmpty() || !failedTableEpochs.isEmpty()) {
+    private void handleSourceReaderFailOver() {
+        LOG.info(
+                "Handling source reader fail over, mark current tiering table 
epoch {} as failed.",
+                tieringTableEpochs);
+        // we need to make all as failed
+        failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs));
+        tieringTableEpochs.clear();
+        // also clean all pending splits since we mark all as failed
+        pendingSplits.clear();
+        if (!failedTableEpochs.isEmpty()) {
             // call one round of heartbeat to notify table has been finished 
or failed
             this.context.callAsync(
                     this::requestTieringTableSplitsViaHeartBeat, 
this::generateAndAssignSplits);
@@ -248,6 +298,10 @@ public class TieringSourceEnumerator
     }
 
     private void assignSplits() {
+        // we don't assign splits during failovering
+        if (isFailOvering) {
+            return;
+        }
         /* This method may be called from both addSplitsBack and 
handleSplitRequest, make it thread safe. */
         synchronized (readersAwaitingSplit) {
             if (!readersAwaitingSplit.isEmpty()) {
@@ -476,7 +530,7 @@ public class TieringSourceEnumerator
         static LakeTieringHeartbeatResponse waitHeartbeatResponse(
                 CompletableFuture<LakeTieringHeartbeatResponse> 
responseCompletableFuture) {
             try {
-                return responseCompletableFuture.get();
+                return responseCompletableFuture.get(3, TimeUnit.MINUTES);
             } catch (Exception e) {
                 LOG.error("Failed to wait heartbeat response due to ", e);
                 throw new FlinkRuntimeException("Failed to wait heartbeat 
response due to ", e);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java
new file mode 100644
index 000000000..c87078d7f
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.TableWriter;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.values.TestingValuesLake;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.replica.Replica;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test base for tiering to Values Lake by Flink. */
+class FlinkTieringTestBase {
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setClusterConf(initConfig())
+                    .setNumOfTabletServers(3)
+                    .build();
+
+    protected StreamExecutionEnvironment execEnv;
+
+    protected static Connection conn;
+    protected static Admin admin;
+    protected static Configuration clientConf;
+
+    private static Configuration initConfig() {
+        Configuration conf = new Configuration();
+        conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
+                .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 
Integer.MAX_VALUE);
+
+        // Configure the tiering sink to be Lance
+        conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.LANCE);
+        return conf;
+    }
+
+    @BeforeAll
+    protected static void beforeAll() {
+        clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+        conn = ConnectionFactory.createConnection(clientConf);
+        admin = conn.getAdmin();
+    }
+
+    @AfterAll
+    static void afterAll() throws Exception {
+        if (admin != null) {
+            admin.close();
+            admin = null;
+        }
+        if (conn != null) {
+            conn.close();
+            conn = null;
+        }
+    }
+
+    @BeforeEach
+    public void beforeEach() {
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        execEnv.setParallelism(2);
+    }
+
+    protected long createPkTable(
+            TablePath tablePath, int bucketNum, boolean enableAutoCompaction, 
Schema schema)
+            throws Exception {
+        TableDescriptor.Builder pkTableBuilder =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .distributedBy(bucketNum)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+
+        if (enableAutoCompaction) {
+            
pkTableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), 
"true");
+        }
+        return createTable(tablePath, pkTableBuilder.build());
+    }
+
+    protected long createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
+            throws Exception {
+        admin.createTable(tablePath, tableDescriptor, true).get();
+        return admin.getTableInfo(tablePath).get().getTableId();
+    }
+
+    protected void assertReplicaStatus(TableBucket tb, long 
expectedLogEndOffset) {
+        retry(
+                Duration.ofMinutes(1),
+                () -> {
+                    Replica replica = getLeaderReplica(tb);
+                    // datalake snapshot id should be updated
+                    assertThat(replica.getLogTablet().getLakeTableSnapshotId())
+                            .isGreaterThanOrEqualTo(0);
+                    
assertThat(replica.getLakeLogEndOffset()).isEqualTo(expectedLogEndOffset);
+                });
+    }
+
+    protected Replica getLeaderReplica(TableBucket tableBucket) {
+        return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+    }
+
+    protected void writeRows(TablePath tablePath, List<InternalRow> rows) 
throws Exception {
+        try (Table table = conn.getTable(tablePath)) {
+            TableWriter tableWriter;
+            tableWriter = table.newUpsert().createWriter();
+            for (InternalRow row : rows) {
+                ((UpsertWriter) tableWriter).upsert(row);
+            }
+            tableWriter.flush();
+        }
+    }
+
+    protected void waitUntilSnapshot(long tableId, int bucketNum, long 
snapshotId) {
+        for (int i = 0; i < bucketNum; i++) {
+            TableBucket tableBucket = new TableBucket(tableId, i);
+            FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, 
snapshotId);
+        }
+    }
+
+    @SuppressWarnings("resource")
+    public List<InternalRow> getValuesRecords(TablePath tablePath) {
+        return TestingValuesLake.getResults(tablePath.toString());
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java
new file mode 100644
index 000000000..a8ee17a6d
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.tiering.committer.CommittableMessageTypeInfo;
+import org.apache.fluss.flink.tiering.committer.TieringCommitOperatorFactory;
+import org.apache.fluss.flink.tiering.source.TableBucketWriteResultTypeInfo;
+import org.apache.fluss.flink.tiering.source.TieringSource;
+import org.apache.fluss.lake.values.TestingValuesLake;
+import org.apache.fluss.lake.values.tiering.TestingValuesLakeTieringFactory;
+import org.apache.fluss.lake.writer.LakeTieringFactory;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static 
org.apache.fluss.flink.tiering.source.TieringSource.TIERING_SOURCE_TRANSFORMATION_UID;
+import static 
org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test tiering failover. */
+class TieringFailoverITCase extends FlinkTieringTestBase {
+    protected static final String DEFAULT_DB = "fluss";
+
+    private static StreamExecutionEnvironment execEnv;
+
+    private static final Schema schema =
+            Schema.newBuilder()
+                    .column("f_int", DataTypes.INT())
+                    .column("f_string", DataTypes.STRING())
+                    .primaryKey("f_string")
+                    .build();
+
+    @BeforeAll
+    protected static void beforeAll() {
+        FlinkTieringTestBase.beforeAll();
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        execEnv.setParallelism(2);
+        execEnv.enableCheckpointing(1000);
+    }
+
+    @Test
+    void testTiering() throws Exception {
+        // create a pk table, write some records and wait until snapshot 
finished
+        TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable");
+        long t1Id = createPkTable(t1, 1, false, schema);
+        TableBucket t1Bucket = new TableBucket(t1Id, 0);
+        // write records
+        List<InternalRow> rows = Arrays.asList(row(1, "i1"), row(2, "i2"), 
row(3, "i3"));
+        List<InternalRow> expectedRows = new ArrayList<>(rows);
+        writeRows(t1, rows);
+        waitUntilSnapshot(t1Id, 1, 0);
+
+        // fail the first write to the pk table
+        TestingValuesLake.failWhen(t1.toString()).failWriteOnce();
+
+        // then start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+        try {
+            // check the status of replica after synced
+            assertReplicaStatus(t1Bucket, 3);
+
+            checkDataInValuesTable(t1, rows);
+
+            // then write data to the pk tables
+            // write records
+            rows = Arrays.asList(row(1, "i11"), row(2, "i22"), row(3, "i33"));
+            expectedRows.addAll(rows);
+            // write records
+            writeRows(t1, rows);
+
+            // check the status of replica of t1 after synced
+            // not check start offset since we won't
+            // update start log offset for primary key table
+            assertReplicaStatus(t1Bucket, expectedRows.size());
+
+            checkDataInValuesTable(t1, expectedRows);
+        } finally {
+            jobClient.cancel().get();
+        }
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) 
throws Exception {
+        Configuration flussConfig = new Configuration(clientConf);
+        flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L));
+
+        LakeTieringFactory lakeTieringFactory = new 
TestingValuesLakeTieringFactory();
+
+        // build tiering source
+        TieringSource.Builder<?> tieringSourceBuilder =
+                new TieringSource.Builder<>(flussConfig, lakeTieringFactory);
+        if (flussConfig.get(POLL_TIERING_TABLE_INTERVAL) != null) {
+            tieringSourceBuilder.withPollTieringTableIntervalMs(
+                    flussConfig.get(POLL_TIERING_TABLE_INTERVAL).toMillis());
+        }
+        TieringSource<?> tieringSource = tieringSourceBuilder.build();
+        DataStreamSource<?> source =
+                execEnv.fromSource(
+                        tieringSource,
+                        WatermarkStrategy.noWatermarks(),
+                        "TieringSource",
+                        TableBucketWriteResultTypeInfo.of(
+                                () -> 
lakeTieringFactory.getWriteResultSerializer()));
+
+        source.getTransformation().setUid(TIERING_SOURCE_TRANSFORMATION_UID);
+
+        source.transform(
+                        "TieringCommitter",
+                        CommittableMessageTypeInfo.of(
+                                () -> 
lakeTieringFactory.getCommittableSerializer()),
+                        new TieringCommitOperatorFactory(
+                                flussConfig,
+                                Configuration.fromMap(Collections.emptyMap()),
+                                lakeTieringFactory))
+                .setParallelism(1)
+                .setMaxParallelism(1)
+                .sinkTo(new DiscardingSink())
+                .name("end")
+                .setParallelism(1);
+        String jobName =
+                execEnv.getConfiguration()
+                        .getOptional(PipelineOptions.NAME)
+                        .orElse("Fluss Lake Tiering FailOver IT Test.");
+
+        return execEnv.executeAsync(jobName);
+    }
+
+    private void checkDataInValuesTable(TablePath tablePath, List<InternalRow> 
expectedRows)
+            throws Exception {
+        Iterator<InternalRow> actualIterator = 
getValuesRecords(tablePath).iterator();
+        Iterator<InternalRow> iterator = expectedRows.iterator();
+        while (iterator.hasNext() && actualIterator.hasNext()) {
+            InternalRow row = iterator.next();
+            InternalRow record = actualIterator.next();
+            assertThat(record.getInt(0)).isEqualTo(row.getInt(0));
+            assertThat(record.getString(1)).isEqualTo(row.getString(1));
+        }
+        assertThat(actualIterator.hasNext()).isFalse();
+        assertThat(iterator.hasNext()).isFalse();
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/FlussMockSplitEnumeratorContext.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/FlussMockSplitEnumeratorContext.java
new file mode 100644
index 000000000..2bbd11ef6
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/FlussMockSplitEnumeratorContext.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.source.enumerator;
+
+import org.apache.fluss.utils.MapUtils;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A mock extension of {@link MockSplitEnumeratorContext} for testing 
purposes, support registering
+ * source readers with attempt number.
+ *
+ * @param <SplitT> The type of {@link SourceSplit} used by the source.
+ */
+class FlussMockSplitEnumeratorContext<SplitT extends SourceSplit>
+        extends MockSplitEnumeratorContext<SplitT> {
+
+    private final ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>> 
registeredReaders;
+
+    public FlussMockSplitEnumeratorContext(int parallelism) {
+        super(parallelism);
+        this.registeredReaders = MapUtils.newConcurrentHashMap();
+    }
+
+    public void registerSourceReader(int subtaskId, int attemptNumber, String 
location) {
+        final Map<Integer, ReaderInfo> attemptReaders =
+                registeredReaders.computeIfAbsent(subtaskId, k -> 
MapUtils.newConcurrentHashMap());
+        checkState(
+                !attemptReaders.containsKey(attemptNumber),
+                "ReaderInfo of subtask %s (#%s) already exists.",
+                subtaskId,
+                attemptNumber);
+        attemptReaders.put(attemptNumber, new ReaderInfo(subtaskId, location));
+    }
+
+    @Override
+    public void registerReader(ReaderInfo readerInfo) {
+        this.registerSourceReader(readerInfo.getSubtaskId(), 0, 
readerInfo.getLocation());
+    }
+
+    @Override
+    public Map<Integer, ReaderInfo> registeredReaders() {
+        final Map<Integer, ReaderInfo> readers = new HashMap<>();
+        for (Map.Entry<Integer, ConcurrentMap<Integer, ReaderInfo>> entry :
+                registeredReaders.entrySet()) {
+            final int subtaskIndex = entry.getKey();
+            final Map<Integer, ReaderInfo> attemptReaders = entry.getValue();
+            if (!attemptReaders.isEmpty()) {
+                int earliestAttempt = Collections.min(attemptReaders.keySet());
+                readers.put(subtaskIndex, attemptReaders.get(earliestAttempt));
+            }
+        }
+        return Collections.unmodifiableMap(readers);
+    }
+
+    @Override
+    public Map<Integer, Map<Integer, ReaderInfo>> 
registeredReadersOfAttempts() {
+        return Collections.unmodifiableMap(registeredReaders);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
index ba8c5b17a..84e5bd5db 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
@@ -20,7 +20,6 @@ package org.apache.fluss.flink.tiering.source.enumerator;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
 import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
-import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
 import org.apache.fluss.flink.tiering.source.TieringTestBase;
 import org.apache.fluss.flink.tiering.source.split.TieringLogSplit;
 import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit;
@@ -32,9 +31,7 @@ import 
org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
 import org.apache.fluss.rpc.messages.PbLakeTableOffsetForBucket;
 import org.apache.fluss.rpc.messages.PbLakeTableSnapshotInfo;
 
-import org.apache.flink.api.connector.source.ReaderInfo;
 import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -76,8 +73,8 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
         int numSubtasks = 4;
         int expectNumberOfSplits = 3;
         // test get snapshot split & log split and the assignment
-        try (MockSplitEnumeratorContext<TieringSplit> context =
-                new MockSplitEnumeratorContext<>(numSubtasks)) {
+        try (FlussMockSplitEnumeratorContext<TieringSplit> context =
+                new FlussMockSplitEnumeratorContext<>(numSubtasks)) {
             TieringSourceEnumerator enumerator =
                     new TieringSourceEnumerator(flussConf, context, 500);
 
@@ -85,10 +82,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
             assertThat(context.getSplitsAssignmentSequence()).isEmpty();
 
             // register all readers
-            for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
-                registerReader(context, enumerator, subtaskId, "localhost-" + 
subtaskId);
-                enumerator.handleSplitRequest(subtaskId, "localhost-" + 
subtaskId);
-            }
+            registerReaderAndHandleSplitRequests(context, enumerator, 
numSubtasks, 0);
 
             // try to assign splits
             context.runPeriodicCallable(0);
@@ -161,8 +155,8 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
         int expectNumberOfSplits = 3;
 
         // test get snapshot split assignment
-        try (MockSplitEnumeratorContext<TieringSplit> context =
-                new MockSplitEnumeratorContext<>(numSubtasks)) {
+        try (FlussMockSplitEnumeratorContext<TieringSplit> context =
+                new FlussMockSplitEnumeratorContext<>(numSubtasks)) {
             TieringSourceEnumerator enumerator =
                     new TieringSourceEnumerator(flussConf, context, 500);
 
@@ -170,10 +164,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
             assertThat(context.getSplitsAssignmentSequence()).isEmpty();
 
             // register all readers
-            for (int subtaskId = 0; subtaskId < 3; subtaskId++) {
-                registerReader(context, enumerator, subtaskId, "localhost-" + 
subtaskId);
-                enumerator.handleSplitRequest(subtaskId, "localhost-" + 
subtaskId);
-            }
+            registerReaderAndHandleSplitRequests(context, enumerator, 
numSubtasks, 0);
             waitUntilTieringTableSplitAssignmentReady(context, 
DEFAULT_BUCKET_NUM, 3000L);
 
             Map<Integer, List<TieringSplit>> expectedSnapshotAssignment = new 
HashMap<>();
@@ -252,8 +243,8 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
         int numSubtasks = 4;
         int expectNumberOfSplits = 3;
         // test get log split and the assignment
-        try (MockSplitEnumeratorContext<TieringSplit> context =
-                new MockSplitEnumeratorContext<>(numSubtasks)) {
+        try (FlussMockSplitEnumeratorContext<TieringSplit> context =
+                new FlussMockSplitEnumeratorContext<>(numSubtasks)) {
             TieringSourceEnumerator enumerator =
                     new TieringSourceEnumerator(flussConf, context, 500);
 
@@ -261,10 +252,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
             assertThat(context.getSplitsAssignmentSequence()).isEmpty();
 
             // register all readers
-            for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
-                registerReader(context, enumerator, subtaskId, "localhost-" + 
subtaskId);
-                enumerator.handleSplitRequest(subtaskId, "localhost-" + 
subtaskId);
-            }
+            registerReaderAndHandleSplitRequests(context, enumerator, 
numSubtasks, 0);
 
             // write one row to one bucket, keep the other buckets empty
             Map<Integer, Long> bucketOffsetOfFirstWrite =
@@ -351,8 +339,8 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
         int numSubtasks = 6;
         int expectNumberOfSplits = 6;
         // test get snapshot split assignment
-        try (MockSplitEnumeratorContext<TieringSplit> context =
-                new MockSplitEnumeratorContext<>(numSubtasks)) {
+        try (FlussMockSplitEnumeratorContext<TieringSplit> context =
+                new FlussMockSplitEnumeratorContext<>(numSubtasks)) {
             TieringSourceEnumerator enumerator =
                     new TieringSourceEnumerator(flussConf, context, 500);
 
@@ -360,10 +348,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
             assertThat(context.getSplitsAssignmentSequence()).isEmpty();
 
             // register all readers
-            for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
-                registerReader(context, enumerator, subtaskId, "localhost-" + 
subtaskId);
-                enumerator.handleSplitRequest(subtaskId, "localhost-" + 
subtaskId);
-            }
+            registerReaderAndHandleSplitRequests(context, enumerator, 
numSubtasks, 0);
 
             // try to assign splits
             context.runPeriodicCallable(0);
@@ -455,8 +440,8 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
         int numSubtasks = 6;
         int expectNumberOfSplits = 6;
         // test get log split assignment
-        try (MockSplitEnumeratorContext<TieringSplit> context =
-                new MockSplitEnumeratorContext<>(numSubtasks)) {
+        try (FlussMockSplitEnumeratorContext<TieringSplit> context =
+                new FlussMockSplitEnumeratorContext<>(numSubtasks)) {
             TieringSourceEnumerator enumerator =
                     new TieringSourceEnumerator(flussConf, context, 500);
 
@@ -464,10 +449,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
             assertThat(context.getSplitsAssignmentSequence()).isEmpty();
 
             // register all readers
-            for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
-                registerReader(context, enumerator, subtaskId, "localhost-" + 
subtaskId);
-                enumerator.handleSplitRequest(subtaskId, "localhost-" + 
subtaskId);
-            }
+            registerReaderAndHandleSplitRequests(context, enumerator, 
numSubtasks, 0);
 
             Map<Long, Map<Integer, Long>> bucketOffsetOfFirstWrite =
                     appendRowForPartitionedTable(
@@ -581,8 +563,8 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
         Map<Integer, Long> bucketOffsetOfWrite =
                 appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
         // test get log split and the assignment
-        try (MockSplitEnumeratorContext<TieringSplit> context =
-                new MockSplitEnumeratorContext<>(numSubtasks)) {
+        try (FlussMockSplitEnumeratorContext<TieringSplit> context =
+                new FlussMockSplitEnumeratorContext<>(numSubtasks)) {
             TieringSourceEnumerator enumerator =
                     new TieringSourceEnumerator(flussConf, context, 500);
 
@@ -590,10 +572,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
             assertThat(context.getSplitsAssignmentSequence()).isEmpty();
 
             // register all readers
-            for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
-                registerReader(context, enumerator, subtaskId, "localhost-" + 
subtaskId);
-                enumerator.handleSplitRequest(subtaskId, "localhost-" + 
subtaskId);
-            }
+            registerReaderAndHandleSplitRequests(context, enumerator, 
numSubtasks, 0);
             waitUntilTieringTableSplitAssignmentReady(context, 
DEFAULT_BUCKET_NUM, 200);
 
             List<TieringSplit> expectedAssignment = new ArrayList<>();
@@ -630,7 +609,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
     }
 
     @Test
-    void testHandleFailOverEvent() throws Throwable {
+    void testHandleReaderFailOver() throws Throwable {
         TablePath tablePath1 = TablePath.of(DEFAULT_DB, 
"tiering-failover-test-log-table1");
         createTable(tablePath1, DEFAULT_LOG_TABLE_DESCRIPTOR);
         appendRow(tablePath1, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
@@ -639,37 +618,41 @@ class TieringSourceEnumeratorTest extends TieringTestBase 
{
         createTable(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR);
         appendRow(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
 
-        int numSubtasks = 1;
-        try (MockSplitEnumeratorContext<TieringSplit> context =
-                new MockSplitEnumeratorContext<>(numSubtasks)) {
+        try (FlussMockSplitEnumeratorContext<TieringSplit> context =
+                new FlussMockSplitEnumeratorContext<>(3)) {
             TieringSourceEnumerator enumerator =
                     new TieringSourceEnumerator(flussConf, context, 500);
 
             enumerator.start();
             assertThat(context.getSplitsAssignmentSequence()).isEmpty();
 
-            // register one reader
-            int subtaskId = 0;
-            registerReader(context, enumerator, subtaskId, "localhost-" + 
subtaskId);
-
-            // handle split request
-            enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
+            // register readers and handle split requests for attempt 0
+            registerReaderAndHandleSplitRequests(context, enumerator, 3, 0);
 
             // should get one tiering split, and the split is for tablePath1
-            verifyTieringSplitAssignment(context, 1, tablePath1);
+            verifyTieringSplitAssignment(context, 3, tablePath1);
 
             // clean assignment
             context.getSplitsAssignmentSequence().clear();
 
-            // enumerator handle TieringFailOverEvent, which will mark current 
tiering tablePath1 as
-            // fail, and all pending splits should be clear
-            enumerator.handleSourceEvent(subtaskId, new 
TieringFailOverEvent());
+            // readers failover: Enumerator marks tablePath1 as failed and 
clears its splits
+            // register readers and handle split requests (attempt 1)
+            registerReaderAndHandleSplitRequests(context, enumerator, 3, 1);
 
-            // handle split request
-            enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
             // now, should get another one tiering split, the split is for 
tablePath2 since all
             // pending split for tablePath1 is clear
-            verifyTieringSplitAssignment(context, 1, tablePath2);
+            verifyTieringSplitAssignment(context, 3, tablePath2);
+
+            // clean assignment
+            context.getSplitsAssignmentSequence().clear();
+
+            // readers failover again: Enumerator marks tablePath2 as failed 
and clears its splits
+            // register readers and process split requests for attempt 2
+            registerReaderAndHandleSplitRequests(context, enumerator, 3, 2);
+
+            // now, should get no tiering split, since all pending split for 
tablePath1 and
+            // tablePath2 is clear
+            verifyTieringSplitAssignment(context, 0, tablePath2);
         }
     }
 
@@ -697,17 +680,22 @@ class TieringSourceEnumeratorTest extends TieringTestBase 
{
     }
 
     // --------------------- Test Utils ---------------------
-    private void registerReader(
-            MockSplitEnumeratorContext<TieringSplit> context,
+    private void registerReaderAndHandleSplitRequests(
+            FlussMockSplitEnumeratorContext<TieringSplit> context,
             TieringSourceEnumerator enumerator,
-            int readerId,
-            String hostname) {
-        context.registerReader(new ReaderInfo(readerId, hostname));
-        enumerator.addReader(readerId);
+            int numSubtasks,
+            int attemptNumber) {
+        for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
+            context.registerSourceReader(subtaskId, attemptNumber, 
"localhost-" + subtaskId);
+            enumerator.addReader(subtaskId);
+            enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
+        }
     }
 
     private void waitUntilTieringTableSplitAssignmentReady(
-            MockSplitEnumeratorContext<TieringSplit> context, int 
expectedSplitsNum, long sleepMs)
+            FlussMockSplitEnumeratorContext<TieringSplit> context,
+            int expectedSplitsNum,
+            long sleepMs)
             throws Throwable {
         while (context.getSplitsAssignmentSequence().size() < 
expectedSplitsNum) {
             if (!context.getPeriodicCallables().isEmpty()) {
@@ -726,11 +714,11 @@ class TieringSourceEnumeratorTest extends TieringTestBase 
{
     }
 
     private void verifyTieringSplitAssignment(
-            MockSplitEnumeratorContext<TieringSplit> context,
+            FlussMockSplitEnumeratorContext<TieringSplit> context,
             int expectedSplitSize,
             TablePath expectedTablePath)
             throws Throwable {
-        waitUntilTieringTableSplitAssignmentReady(context, 1, 200);
+        waitUntilTieringTableSplitAssignmentReady(context, expectedSplitSize, 
200);
         List<SplitsAssignment<TieringSplit>> actualAssignment =
                 context.getSplitsAssignmentSequence();
 
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLake.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLake.java
new file mode 100644
index 000000000..218d7a6f8
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLake.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values;
+
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.utils.MapUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
+
+/**
+ * An in-memory lake storage implementation for testing, supporting only log 
tables.
+ *
+ * <p>Provides utilities for managing tables, writing records, committing 
stages, and retrieving
+ * results in a test environment.
+ */
+public class TestingValuesLake {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestingValuesLake.class);
+
+    private static final Map<String, TestingValuesTable> globalTables =
+            MapUtils.newConcurrentHashMap();
+    private static final Map<String, TableFailureController> 
FAILURE_CONTROLLERS =
+            MapUtils.newConcurrentHashMap();
+
+    public static TableFailureController failWhen(String tableId) {
+        return FAILURE_CONTROLLERS.computeIfAbsent(tableId, k -> new 
TableFailureController());
+    }
+
+    public static List<InternalRow> getResults(String tableId) {
+        TestingValuesTable table = globalTables.get(tableId);
+        checkNotNull(table, tableId + " does not exist");
+        return table.getResult();
+    }
+
+    public static void writeRecord(String tableId, String stageId, LogRecord 
record)
+            throws IOException {
+        TestingValuesTable table = globalTables.get(tableId);
+        checkNotNull(table, tableId + " does not exist");
+        TableFailureController controller = FAILURE_CONTROLLERS.get(tableId);
+        if (controller != null) {
+            controller.checkWriteShouldFail(tableId);
+        }
+        table.writeRecord(stageId, record);
+        LOG.info("Write record to stage {}: {}", stageId, record);
+    }
+
+    public static long commit(
+            String tableId, List<String> stageIds, Map<String, String> 
snapshotProperties)
+            throws IOException {
+        TestingValuesTable table = globalTables.get(tableId);
+        checkNotNull(table, "commit stage %s failed, table %s does not exist", 
stageIds, tableId);
+        table.commit(stageIds, snapshotProperties);
+        LOG.info("Commit table {} stage {}", tableId, stageIds);
+        return table.getSnapshotId();
+    }
+
+    public static void abort(String tableId, List<String> stageIds) {
+        TestingValuesTable table = globalTables.get(tableId);
+        checkNotNull(
+                table, "abort stage record %s failed, table %s does not 
exist", stageIds, tableId);
+        table.abort(stageIds);
+        LOG.info("Abort table {} stage {}", tableId, stageIds);
+    }
+
+    public static TestingValuesTable getTable(String tableId) {
+        return globalTables.get(tableId);
+    }
+
+    public static void createTable(String tableId) {
+        if (!globalTables.containsKey(tableId)) {
+            globalTables.put(tableId, new TestingValuesTable());
+            TestingValuesTable table = globalTables.get(tableId);
+            checkNotNull(table, "create table %s failed", tableId);
+        }
+    }
+
+    /** maintain the columns, primaryKeys and records of a specific table in 
memory. */
+    public static class TestingValuesTable {
+
+        private final Lock lock = new ReentrantLock();
+
+        private final List<InternalRow> records;
+        private final Map<String, List<LogRecord>> stageRecords;
+
+        private long snapshotId = -1L;
+
+        public TestingValuesTable() {
+            this.records = new ArrayList<>();
+            this.stageRecords = new HashMap<>();
+        }
+
+        public List<InternalRow> getResult() {
+            return inLock(lock, () -> new ArrayList<>(records));
+        }
+
+        public void writeRecord(String stageId, LogRecord record) {
+            inLock(
+                    lock,
+                    () -> {
+                        this.stageRecords
+                                .computeIfAbsent(stageId, k -> new 
ArrayList<>())
+                                .add(record);
+                    });
+        }
+
+        public void commit(List<String> stageIds, Map<String, String> 
snapshotProperties) {
+            inLock(
+                    lock,
+                    () -> {
+                        for (String stageId : stageIds) {
+                            List<LogRecord> stageRecords = 
this.stageRecords.get(stageId);
+                            stageRecords.forEach(record -> 
records.add(record.getRow()));
+                            this.stageRecords.remove(stageId);
+                        }
+                        this.snapshotId++;
+                    });
+        }
+
+        public void abort(List<String> stageIds) {
+            inLock(
+                    lock,
+                    () -> {
+                        for (String stageId : stageIds) {
+                            this.stageRecords.remove(stageId);
+                        }
+                    });
+        }
+
+        public long getSnapshotId() {
+            return snapshotId;
+        }
+    }
+
+    /** Controller to control the failure of table write and commit. */
+    public static class TableFailureController {
+        private volatile boolean writeFailEnabled = false;
+        private final AtomicInteger writeFailTimes = new AtomicInteger(0);
+        private final AtomicInteger writeFailCounter = new AtomicInteger(0);
+
+        public TableFailureController failWriteOnce() {
+            return failWriteNext(1);
+        }
+
+        /** Force the next N write calls to throw IOException (thread-safe). */
+        public TableFailureController failWriteNext(int times) {
+            this.writeFailEnabled = true;
+            this.writeFailTimes.set(times);
+            this.writeFailCounter.set(0);
+            return this;
+        }
+
+        private void checkWriteShouldFail(String tableId) throws IOException {
+            if (!writeFailEnabled) {
+                return;
+            }
+
+            int count = writeFailCounter.incrementAndGet();
+            if (count <= writeFailTimes.get()) {
+                LOG.warn(
+                        "ValuesLake FAIL_INJECTED: write() intentionally 
failed [table={} attempt={}/{}]",
+                        tableId,
+                        count,
+                        writeFailTimes.get());
+                throw new IOException(
+                        String.format(
+                                "ValuesLake write failure injected for test 
(attempt %d of %d)",
+                                count, writeFailTimes.get()));
+            } else {
+                writeFailEnabled = false;
+            }
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeCatalog.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeCatalog.java
new file mode 100644
index 000000000..3c5692695
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeCatalog.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values;
+
+import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.TableNotExistException;
+import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.metadata.TableChange;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+
+import java.util.List;
+
+/** Implementation of {@link LakeCatalog} for values lake. */
+public class TestingValuesLakeCatalog implements LakeCatalog {
+    @Override
+    public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor, Context context)
+            throws TableAlreadyExistException {
+        TestingValuesLake.createTable(tablePath.toString());
+    }
+
+    @Override
+    public void alterTable(TablePath tablePath, List<TableChange> 
tableChanges, Context context)
+            throws TableNotExistException {
+        throw new RuntimeException("Not impl.");
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeStorage.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeStorage.java
new file mode 100644
index 000000000..96adaebe3
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeStorage.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values;
+
+import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.lake.lakestorage.LakeStorage;
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.values.tiering.TestingValuesLakeTieringFactory;
+import org.apache.fluss.lake.writer.LakeTieringFactory;
+import org.apache.fluss.metadata.TablePath;
+
+/** Implementation of {@link LakeStorage} for values lake. */
+public class TestingValuesLakeStorage implements LakeStorage {
+    @Override
+    public LakeTieringFactory<?, ?> createLakeTieringFactory() {
+        return new TestingValuesLakeTieringFactory();
+    }
+
+    @Override
+    public LakeCatalog createLakeCatalog() {
+        return new TestingValuesLakeCatalog();
+    }
+
+    @Override
+    public LakeSource<?> createLakeSource(TablePath tablePath) {
+        throw new UnsupportedOperationException("Not impl.");
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeStoragePlugin.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeStoragePlugin.java
new file mode 100644
index 000000000..678080d8c
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeStoragePlugin.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.lakestorage.LakeStorage;
+import org.apache.fluss.lake.lakestorage.LakeStoragePlugin;
+import org.apache.fluss.metadata.DataLakeFormat;
+
+/** Implementation of {@link LakeStoragePlugin} for values lake. */
+public class TestingValuesLakeStoragePlugin implements LakeStoragePlugin {
+
+    // Testing/mock implementation for values lake storage that reuses the 
Lance data lake format
+    // identifier for compatibility with existing Fluss lake storage 
infrastructure.
+    private static final String IDENTIFIER = DataLakeFormat.LANCE.toString();
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public LakeStorage createLakeStorage(Configuration configuration) {
+        return new TestingValuesLakeStorage();
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeCommitter.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeCommitter.java
new file mode 100644
index 000000000..03e1866ab
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeCommitter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values.tiering;
+
+import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
+import org.apache.fluss.lake.committer.LakeCommitter;
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
+import org.apache.fluss.lake.values.TestingValuesLake;
+import org.apache.fluss.utils.InstantiationUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Implementation of {@link LakeCommitter} for values lake. */
+public class TestingValuesLakeCommitter
+        implements LakeCommitter<
+                TestingValuesLakeWriter.TestingValuesWriteResult,
+                TestingValuesLakeCommitter.TestingValuesCommittable> {
+    private final String tableId;
+
+    public TestingValuesLakeCommitter(String tableId) {
+        this.tableId = tableId;
+    }
+
+    @Override
+    public TestingValuesCommittable toCommittable(
+            List<TestingValuesLakeWriter.TestingValuesWriteResult> 
valuesWriteResults)
+            throws IOException {
+        return new TestingValuesCommittable(
+                valuesWriteResults.stream()
+                        
.map(TestingValuesLakeWriter.TestingValuesWriteResult::getStageId)
+                        .collect(Collectors.toList()));
+    }
+
+    @Override
+    public long commit(TestingValuesCommittable committable, Map<String, 
String> snapshotProperties)
+            throws IOException {
+        return TestingValuesLake.commit(tableId, committable.getStageIds(), 
snapshotProperties);
+    }
+
+    @Override
+    public void abort(TestingValuesCommittable committable) throws IOException 
{
+        TestingValuesLake.abort(tableId, committable.getStageIds());
+    }
+
+    @Override
+    public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long 
latestLakeSnapshotIdOfFluss)
+            throws IOException {
+        return null;
+    }
+
+    @Override
+    public void close() throws Exception {}
+
+    /** Committable of {@link TestingValuesLake}. */
+    public static class TestingValuesCommittable implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final List<String> stageIdList = new ArrayList<>();
+
+        public TestingValuesCommittable(List<String> stageIds) {
+            this.stageIdList.addAll(stageIds);
+        }
+
+        public List<String> getStageIds() {
+            return stageIdList;
+        }
+    }
+
+    /** A serializer for {@link TestingValuesCommittable}. */
+    public static class ValuesCommittableSerializer
+            implements SimpleVersionedSerializer<TestingValuesCommittable> {
+        private static final int CURRENT_VERSION = 1;
+
+        @Override
+        public int getVersion() {
+            return CURRENT_VERSION;
+        }
+
+        @Override
+        public byte[] serialize(TestingValuesCommittable committable) throws 
IOException {
+            return InstantiationUtils.serializeObject(committable);
+        }
+
+        @Override
+        public TestingValuesCommittable deserialize(int version, byte[] 
serialized)
+                throws IOException {
+            TestingValuesCommittable valuesCommittable;
+            try {
+                valuesCommittable =
+                        InstantiationUtils.deserializeObject(
+                                serialized, getClass().getClassLoader());
+            } catch (ClassNotFoundException e) {
+                throw new IOException(e);
+            }
+            return valuesCommittable;
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeTieringFactory.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeTieringFactory.java
new file mode 100644
index 000000000..3a9c78578
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeTieringFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values.tiering;
+
+import org.apache.fluss.lake.committer.CommitterInitContext;
+import org.apache.fluss.lake.committer.LakeCommitter;
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
+import org.apache.fluss.lake.writer.LakeTieringFactory;
+import org.apache.fluss.lake.writer.LakeWriter;
+import org.apache.fluss.lake.writer.WriterInitContext;
+
+import java.io.IOException;
+
+/** Implementation of {@link LakeTieringFactory} for values lake. */
+public class TestingValuesLakeTieringFactory
+        implements LakeTieringFactory<
+                TestingValuesLakeWriter.TestingValuesWriteResult,
+                TestingValuesLakeCommitter.TestingValuesCommittable> {
+    @Override
+    public LakeWriter<TestingValuesLakeWriter.TestingValuesWriteResult> 
createLakeWriter(
+            WriterInitContext writerInitContext) throws IOException {
+        return new 
TestingValuesLakeWriter(writerInitContext.tablePath().toString());
+    }
+
+    @Override
+    public 
SimpleVersionedSerializer<TestingValuesLakeWriter.TestingValuesWriteResult>
+            getWriteResultSerializer() {
+        return new 
TestingValuesLakeWriter.TestingValuesWriteResultSerializer();
+    }
+
+    @Override
+    public LakeCommitter<
+                    TestingValuesLakeWriter.TestingValuesWriteResult,
+                    TestingValuesLakeCommitter.TestingValuesCommittable>
+            createLakeCommitter(CommitterInitContext committerInitContext) 
throws IOException {
+        return new 
TestingValuesLakeCommitter(committerInitContext.tablePath().toString());
+    }
+
+    @Override
+    public 
SimpleVersionedSerializer<TestingValuesLakeCommitter.TestingValuesCommittable>
+            getCommittableSerializer() {
+        return new TestingValuesLakeCommitter.ValuesCommittableSerializer();
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeWriter.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeWriter.java
new file mode 100644
index 000000000..480ebe307
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeWriter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values.tiering;
+
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
+import org.apache.fluss.lake.values.TestingValuesLake;
+import org.apache.fluss.lake.writer.LakeWriter;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.utils.InstantiationUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.UUID;
+
+/** Implementation of {@link LakeWriter} for values lake. */
+public class TestingValuesLakeWriter
+        implements 
LakeWriter<TestingValuesLakeWriter.TestingValuesWriteResult> {
+    private final String tableId;
+    private final String writerId;
+
+    public TestingValuesLakeWriter(String tableId) {
+        this.tableId = tableId;
+        this.writerId = UUID.randomUUID().toString();
+    }
+
+    @Override
+    public void write(LogRecord record) throws IOException {
+        TestingValuesLake.writeRecord(tableId, writerId, record);
+    }
+
+    @Override
+    public TestingValuesWriteResult complete() throws IOException {
+        return new TestingValuesWriteResult(writerId);
+    }
+
+    @Override
+    public void close() throws IOException {}
+
+    /** Write result of {@link TestingValuesLake}. */
+    public static class TestingValuesWriteResult implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final String stageId;
+
+        public TestingValuesWriteResult(String stageId) {
+            this.stageId = stageId;
+        }
+
+        public String getStageId() {
+            return stageId;
+        }
+    }
+
+    /** A serializer for {@link TestingValuesWriteResult}. */
+    public static class TestingValuesWriteResultSerializer
+            implements SimpleVersionedSerializer<TestingValuesWriteResult> {
+
+        private static final int CURRENT_VERSION = 1;
+
+        @Override
+        public int getVersion() {
+            return CURRENT_VERSION;
+        }
+
+        @Override
+        public byte[] serialize(TestingValuesWriteResult valuesWriteResult) 
throws IOException {
+            return InstantiationUtils.serializeObject(valuesWriteResult);
+        }
+
+        @Override
+        public TestingValuesWriteResult deserialize(int version, byte[] 
serialized)
+                throws IOException {
+            TestingValuesWriteResult valuesWriteResult;
+            try {
+                valuesWriteResult =
+                        InstantiationUtils.deserializeObject(
+                                serialized, getClass().getClassLoader());
+            } catch (ClassNotFoundException e) {
+                throw new IOException(e);
+            }
+            return valuesWriteResult;
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
 
b/fluss-flink/fluss-flink-common/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
new file mode 100644
index 000000000..3fafafad0
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.fluss.lake.values.TestingValuesLakeStoragePlugin
\ No newline at end of file
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
index 49983b603..11de6baed 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
@@ -270,11 +270,11 @@ class IcebergTieringITCase extends 
FlinkIcebergTieringTestBase {
 
     private void checkDataInIcebergPrimaryKeyTable(
             TablePath tablePath, List<InternalRow> expectedRows) throws 
Exception {
-        Iterator<Record> acturalIterator = 
getIcebergRecords(tablePath).iterator();
+        Iterator<Record> actualIterator = 
getIcebergRecords(tablePath).iterator();
         Iterator<InternalRow> iterator = expectedRows.iterator();
-        while (iterator.hasNext() && acturalIterator.hasNext()) {
+        while (iterator.hasNext() && actualIterator.hasNext()) {
             InternalRow row = iterator.next();
-            Record record = acturalIterator.next();
+            Record record = actualIterator.next();
             assertThat(record.get(0)).isEqualTo(row.getBoolean(0));
             assertThat(record.get(1)).isEqualTo((int) row.getByte(1));
             assertThat(record.get(2)).isEqualTo((int) row.getShort(2));
@@ -307,7 +307,7 @@ class IcebergTieringITCase extends 
FlinkIcebergTieringTestBase {
             assertThat(record.get(17)).isEqualTo(row.getChar(17, 
3).toString());
             
assertThat(record.get(18)).isEqualTo(ByteBuffer.wrap(row.getBytes(18)));
         }
-        assertThat(acturalIterator.hasNext()).isFalse();
+        assertThat(actualIterator.hasNext()).isFalse();
         assertThat(iterator.hasNext()).isFalse();
     }
 

Reply via email to