Copilot commented on code in PR #1984:
URL: https://github.com/apache/fluss/pull/1984#discussion_r2682159498


##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeStoragePlugin.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.lakestorage.LakeStorage;
+import org.apache.fluss.lake.lakestorage.LakeStoragePlugin;
+import org.apache.fluss.metadata.DataLakeFormat;
+
+/** Implementation of {@link LakeStoragePlugin} for values lake. */
+public class TestingValuesLakeStoragePlugin implements LakeStoragePlugin {
+
+    // Uses the default Fluss key encoder and bucketing function via the Lance 
format identifier.
+    // TODO: Make lake format pluggable

Review Comment:
   The comment states "Uses the default Fluss key encoder and bucketing 
function via the Lance format identifier" and then notes "TODO: Make lake 
format pluggable". However, the identifier is set to 
`DataLakeFormat.LANCE.toString()` while this is a testing values lake storage 
plugin, not an actual Lance implementation. This comment appears to be copied 
from another implementation and doesn't accurately describe this testing 
plugin. The comment should be updated to reflect that this is a testing/mock 
implementation that uses the Lance format identifier for compatibility purposes.
   ```suggestion
       // Testing/mock implementation for values lake storage that reuses the 
Lance data lake
       // format identifier for compatibility with existing Fluss lake storage 
infrastructure.
       // TODO: Make the lake format identifier configurable for tests instead 
of hard-coding LANCE.
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -179,11 +181,53 @@ public void addSplitsBack(List<TieringSplit> splits, int 
subtaskId) {
     @Override
     public void addReader(int subtaskId) {
         LOG.info("Adding reader: {} to Tiering Source enumerator.", subtaskId);
-        if (context.registeredReaders().containsKey(subtaskId)) {
+        Map<Integer, ReaderInfo> readerByAttempt =
+                context.registeredReadersOfAttempts().get(subtaskId);
+        if (readerByAttempt != null && !readerByAttempt.isEmpty()) {
             readersAwaitingSplit.add(subtaskId);
+            int maxAttempt = max(readerByAttempt.keySet());
+            if (maxAttempt >= 1) {
+                if (isFailOvering) {
+                    LOG.warn(
+                            "Subtask {} (max attempt {}) registered during 
ongoing failover.",
+                            subtaskId,
+                            maxAttempt);
+                } else {
+                    LOG.warn(
+                            "Detected failover: subtask {} has max attempt {} 
> 0. Triggering global failover handling.",
+                            subtaskId,
+                            maxAttempt);
+                    // should be failover
+                    isFailOvering = true;
+                    handleSourceReaderFailOver();
+                }
+
+                // if registered readers equal to current parallelism, check 
whether all registered
+                // readers have same max attempt
+                if (context.registeredReadersOfAttempts().size() == 
context.currentParallelism()) {
+                    // Check if all readers have the same max attempt number
+                    Set<Integer> maxAttempts =
+                            
context.registeredReadersOfAttempts().values().stream()
+                                    .map(_readerByAttempt -> 
max(_readerByAttempt.keySet()))
+                                    .collect(Collectors.toSet());
+                    int globalMaxAttempt = max(maxAttempts);
+                    if (maxAttempts.size() == 1 && globalMaxAttempt >= 1) {
+                        LOG.info(
+                                "Failover completed. All {} subtasks reached 
the same attempt number {}. Current registered readers are {}",
+                                context.currentParallelism(),
+                                globalMaxAttempt,
+                                context.registeredReadersOfAttempts());
+                        isFailOvering = false;
+                    }
+                }
+            }
         }

Review Comment:
   The method `registeredReadersOfAttempts()` is being called on `context`, 
which is of type `SplitEnumeratorContext<TieringSplit>`. However, this method 
doesn't exist in Flink's standard `SplitEnumeratorContext` interface. It only 
appears to be implemented in the test mock class 
`FlussMockSplitEnumeratorContext`. This will cause a compilation error or 
runtime failure when this code is executed with the actual Flink 
SplitEnumeratorContext. You need to either add this method to the Flink 
interface (which requires extending/wrapping the interface), or find an 
alternative way to detect failover that works with the standard Flink API.
   ```suggestion
           // Register the reader and treat it as awaiting a split. We only use 
the standard
           // SplitEnumeratorContext API here to remain compatible with Flink's 
interfaces.
           readersAwaitingSplit.add(subtaskId);
           // If there are pending splits, try to assign them now.
           assignSplits();
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/FlussMockSplitEnumeratorContext.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.source.enumerator;
+
+import org.apache.fluss.utils.MapUtils;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A mock extension of {@link MockSplitEnumeratorContext} for testing 
purposes, support registering
+ * source readers with attempt number.
+ *
+ * @param <SplitT> The type of {@link SourceSplit} used by the source.
+ */
+class FlussMockSplitEnumeratorContext<SplitT extends SourceSplit>
+        extends MockSplitEnumeratorContext<SplitT> {
+
+    private final ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>> 
registeredReaders;
+
+    public FlussMockSplitEnumeratorContext(int parallelism) {
+        super(parallelism);
+        this.registeredReaders = MapUtils.newConcurrentHashMap();
+    }
+
+    public void registerSourceReader(int subtaskId, int attemptNumber, String 
location) {
+        final Map<Integer, ReaderInfo> attemptReaders =
+                registeredReaders.computeIfAbsent(subtaskId, k -> 
MapUtils.newConcurrentHashMap());
+        checkState(
+                !attemptReaders.containsKey(attemptNumber),
+                "ReaderInfo of subtask %s (#%s) already exists.",
+                subtaskId,
+                attemptNumber);
+        attemptReaders.put(attemptNumber, new ReaderInfo(subtaskId, location));
+    }
+
+    @Override
+    public void registerReader(ReaderInfo readerInfo) {
+        this.registerSourceReader(readerInfo.getSubtaskId(), 0, 
readerInfo.getLocation());
+    }
+
+    @Override
+    public Map<Integer, ReaderInfo> registeredReaders() {
+        final Map<Integer, ReaderInfo> readers = new HashMap<>();
+        for (Map.Entry<Integer, ConcurrentMap<Integer, ReaderInfo>> entry :
+                registeredReaders.entrySet()) {
+            final int subtaskIndex = entry.getKey();
+            final Map<Integer, ReaderInfo> attemptReaders = entry.getValue();
+            int earliestAttempt = Integer.MAX_VALUE;
+            for (int attemptNumber : attemptReaders.keySet()) {
+                if (attemptNumber < earliestAttempt) {
+                    earliestAttempt = attemptNumber;
+                }
+            }
+            readers.put(subtaskIndex, attemptReaders.get(earliestAttempt));

Review Comment:
   The logic for finding the earliest attempt is inefficient. Instead of 
iterating through all keys to find the minimum, you can use 
`Collections.min(attemptReaders.keySet())` or 
`attemptReaders.keySet().stream().min(Integer::compareTo).orElse(Integer.MAX_VALUE)`.
 However, the current implementation will also fail if `attemptReaders` is 
empty, as it would try to get a value for `Integer.MAX_VALUE` which doesn't 
exist in the map.
   ```suggestion
               if (!attemptReaders.isEmpty()) {
                   final int earliestAttempt = 
Collections.min(attemptReaders.keySet());
                   readers.put(subtaskIndex, 
attemptReaders.get(earliestAttempt));
               }
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -179,11 +181,53 @@ public void addSplitsBack(List<TieringSplit> splits, int 
subtaskId) {
     @Override
     public void addReader(int subtaskId) {
         LOG.info("Adding reader: {} to Tiering Source enumerator.", subtaskId);
-        if (context.registeredReaders().containsKey(subtaskId)) {
+        Map<Integer, ReaderInfo> readerByAttempt =
+                context.registeredReadersOfAttempts().get(subtaskId);
+        if (readerByAttempt != null && !readerByAttempt.isEmpty()) {
             readersAwaitingSplit.add(subtaskId);
+            int maxAttempt = max(readerByAttempt.keySet());
+            if (maxAttempt >= 1) {
+                if (isFailOvering) {
+                    LOG.warn(
+                            "Subtask {} (max attempt {}) registered during 
ongoing failover.",
+                            subtaskId,
+                            maxAttempt);
+                } else {
+                    LOG.warn(
+                            "Detected failover: subtask {} has max attempt {} 
> 0. Triggering global failover handling.",
+                            subtaskId,
+                            maxAttempt);
+                    // should be failover
+                    isFailOvering = true;
+                    handleSourceReaderFailOver();
+                }
+
+                // if registered readers equal to current parallelism, check 
whether all registered
+                // readers have same max attempt
+                if (context.registeredReadersOfAttempts().size() == 
context.currentParallelism()) {
+                    // Check if all readers have the same max attempt number
+                    Set<Integer> maxAttempts =
+                            
context.registeredReadersOfAttempts().values().stream()
+                                    .map(_readerByAttempt -> 
max(_readerByAttempt.keySet()))
+                                    .collect(Collectors.toSet());
+                    int globalMaxAttempt = max(maxAttempts);
+                    if (maxAttempts.size() == 1 && globalMaxAttempt >= 1) {
+                        LOG.info(
+                                "Failover completed. All {} subtasks reached 
the same attempt number {}. Current registered readers are {}",
+                                context.currentParallelism(),
+                                globalMaxAttempt,
+                                context.registeredReadersOfAttempts());
+                        isFailOvering = false;
+                    }
+                }
+            }

Review Comment:
   The logic checks `maxAttempt >= 1` to detect failover, but this means the 
first failover (attempt 1) is treated the same as subsequent failovers (attempt 
2, 3, etc.). Consider whether this is the intended behavior. If you want to 
handle the first failover specially, you might want to track and compare the 
actual attempt numbers rather than just checking if it's greater than or equal 
to 1.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -179,11 +181,53 @@ public void addSplitsBack(List<TieringSplit> splits, int 
subtaskId) {
     @Override
     public void addReader(int subtaskId) {
         LOG.info("Adding reader: {} to Tiering Source enumerator.", subtaskId);
-        if (context.registeredReaders().containsKey(subtaskId)) {
+        Map<Integer, ReaderInfo> readerByAttempt =
+                context.registeredReadersOfAttempts().get(subtaskId);
+        if (readerByAttempt != null && !readerByAttempt.isEmpty()) {
             readersAwaitingSplit.add(subtaskId);
+            int maxAttempt = max(readerByAttempt.keySet());
+            if (maxAttempt >= 1) {
+                if (isFailOvering) {
+                    LOG.warn(
+                            "Subtask {} (max attempt {}) registered during 
ongoing failover.",
+                            subtaskId,
+                            maxAttempt);
+                } else {
+                    LOG.warn(
+                            "Detected failover: subtask {} has max attempt {} 
> 0. Triggering global failover handling.",
+                            subtaskId,
+                            maxAttempt);
+                    // should be failover
+                    isFailOvering = true;
+                    handleSourceReaderFailOver();
+                }
+
+                // if registered readers equal to current parallelism, check 
whether all registered
+                // readers have same max attempt
+                if (context.registeredReadersOfAttempts().size() == 
context.currentParallelism()) {
+                    // Check if all readers have the same max attempt number
+                    Set<Integer> maxAttempts =
+                            
context.registeredReadersOfAttempts().values().stream()
+                                    .map(_readerByAttempt -> 
max(_readerByAttempt.keySet()))
+                                    .collect(Collectors.toSet());
+                    int globalMaxAttempt = max(maxAttempts);
+                    if (maxAttempts.size() == 1 && globalMaxAttempt >= 1) {
+                        LOG.info(
+                                "Failover completed. All {} subtasks reached 
the same attempt number {}. Current registered readers are {}",
+                                context.currentParallelism(),
+                                globalMaxAttempt,
+                                context.registeredReadersOfAttempts());
+                        isFailOvering = false;
+                    }
+                }
+            }

Review Comment:
   The method `registeredReadersOfAttempts()` is being called on `context`, but 
this method doesn't exist in Flink's standard `SplitEnumeratorContext` 
interface. This will cause a compilation error or runtime failure.
   ```suggestion
           // Use the standard Flink API to get the currently registered 
readers.
           Map<Integer, ReaderInfo> registeredReaders = 
context.registeredReaders();
           ReaderInfo readerInfo = registeredReaders.get(subtaskId);
           if (readerInfo != null) {
               readersAwaitingSplit.add(subtaskId);
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkValuesTieringTestBase.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.TableWriter;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.lake.values.ValuesLake;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.replica.Replica;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.server.zk.data.lake.LakeTable;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.fluss.lake.committer.LakeCommitter.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test base for tiering to Values Lake by Flink. */
+class FlinkValuesTieringTestBase {
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setClusterConf(initConfig())
+                    .setNumOfTabletServers(3)
+                    .build();
+
+    protected StreamExecutionEnvironment execEnv;
+
+    protected static Connection conn;
+    protected static Admin admin;
+    protected static Configuration clientConf;
+
+    private static Configuration initConfig() {
+        Configuration conf = new Configuration();
+        conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
+                .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 
Integer.MAX_VALUE);
+
+        // Configure the tiering sink to be Paimon

Review Comment:
   The comment states "Configure the tiering sink to be Paimon" but the code 
actually sets the data lake format to LANCE, not Paimon. This is misleading and 
should be corrected to either "Configure the tiering sink to be Lance" or 
"Configure the data lake format to Lance".
   ```suggestion
           // Configure the tiering sink to be Lance
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -179,11 +181,53 @@ public void addSplitsBack(List<TieringSplit> splits, int 
subtaskId) {
     @Override
     public void addReader(int subtaskId) {
         LOG.info("Adding reader: {} to Tiering Source enumerator.", subtaskId);
-        if (context.registeredReaders().containsKey(subtaskId)) {
+        Map<Integer, ReaderInfo> readerByAttempt =
+                context.registeredReadersOfAttempts().get(subtaskId);
+        if (readerByAttempt != null && !readerByAttempt.isEmpty()) {
             readersAwaitingSplit.add(subtaskId);
+            int maxAttempt = max(readerByAttempt.keySet());
+            if (maxAttempt >= 1) {
+                if (isFailOvering) {
+                    LOG.warn(
+                            "Subtask {} (max attempt {}) registered during 
ongoing failover.",
+                            subtaskId,
+                            maxAttempt);
+                } else {
+                    LOG.warn(
+                            "Detected failover: subtask {} has max attempt {} 
> 0. Triggering global failover handling.",
+                            subtaskId,
+                            maxAttempt);
+                    // should be failover
+                    isFailOvering = true;
+                    handleSourceReaderFailOver();
+                }
+
+                // if registered readers equal to current parallelism, check 
whether all registered
+                // readers have same max attempt
+                if (context.registeredReadersOfAttempts().size() == 
context.currentParallelism()) {

Review Comment:
   The method `registeredReadersOfAttempts()` is being called on `context`, but 
this method doesn't exist in Flink's standard `SplitEnumeratorContext` 
interface. This will cause a compilation error or runtime failure.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/ValuesLakeCommitter.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values.tiering;
+
+import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
+import org.apache.fluss.lake.committer.LakeCommitter;
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
+import org.apache.fluss.lake.values.ValuesLake;
+import org.apache.fluss.utils.InstantiationUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Implementation of {@link LakeCommitter} for values lake. */
+public class ValuesLakeCommitter
+        implements LakeCommitter<
+                ValuesLakeWriter.ValuesWriteResult, 
ValuesLakeCommitter.ValuesCommittable> {
+    private final String tableId;
+
+    public ValuesLakeCommitter(String tableId) {
+        this.tableId = tableId;
+    }
+
+    @Override
+    public ValuesCommittable toCommittable(
+            List<ValuesLakeWriter.ValuesWriteResult> valuesWriteResults) 
throws IOException {
+        return new ValuesCommittable(
+                valuesWriteResults.stream()
+                        .map(ValuesLakeWriter.ValuesWriteResult::getStageId)
+                        .collect(Collectors.toList()));
+    }
+
+    @Override
+    public long commit(ValuesCommittable committable, Map<String, String> 
snapshotProperties)
+            throws IOException {
+        return ValuesLake.commit(tableId, committable.getStageIds(), 
snapshotProperties);
+    }
+
+    @Override
+    public void abort(ValuesCommittable committable) throws IOException {
+        ValuesLake.abort(tableId, committable.getStageIds());
+    }
+
+    @Override
+    public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long 
latestLakeSnapshotIdOfFluss)
+            throws IOException {
+        ValuesLake.ValuesTable table = ValuesLake.getTable(tableId);
+        long latestSnapshotId = table.getSnapshotId();
+        if (latestSnapshotId < 0) {
+            return null;
+        }
+        if (latestLakeSnapshotIdOfFluss != null
+                && latestSnapshotId == latestLakeSnapshotIdOfFluss) {
+            if (latestSnapshotId <= latestLakeSnapshotIdOfFluss) {
+                return null;
+            }

Review Comment:
   This conditional logic is redundant and incorrect. Line 74 checks if 
`latestSnapshotId == latestLakeSnapshotIdOfFluss`, and then line 76 checks if 
`latestSnapshotId <= latestLakeSnapshotIdOfFluss`. If line 74's condition is 
true (they are equal), then line 76's condition will always be true (since 
equal means less than or equal to). The inner check on line 76 should either be 
removed, or the outer condition on line 74 should be changed. The logic appears 
to intend: if `latestLakeSnapshotIdOfFluss` is not null and the latest snapshot 
ID is less than or equal to it, return null.
   ```suggestion
                   && latestSnapshotId <= latestLakeSnapshotIdOfFluss) {
               return null;
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -179,11 +181,53 @@ public void addSplitsBack(List<TieringSplit> splits, int 
subtaskId) {
     @Override
     public void addReader(int subtaskId) {
         LOG.info("Adding reader: {} to Tiering Source enumerator.", subtaskId);
-        if (context.registeredReaders().containsKey(subtaskId)) {
+        Map<Integer, ReaderInfo> readerByAttempt =
+                context.registeredReadersOfAttempts().get(subtaskId);
+        if (readerByAttempt != null && !readerByAttempt.isEmpty()) {
             readersAwaitingSplit.add(subtaskId);
+            int maxAttempt = max(readerByAttempt.keySet());
+            if (maxAttempt >= 1) {
+                if (isFailOvering) {
+                    LOG.warn(
+                            "Subtask {} (max attempt {}) registered during 
ongoing failover.",
+                            subtaskId,
+                            maxAttempt);
+                } else {
+                    LOG.warn(
+                            "Detected failover: subtask {} has max attempt {} 
> 0. Triggering global failover handling.",
+                            subtaskId,
+                            maxAttempt);
+                    // should be failover
+                    isFailOvering = true;
+                    handleSourceReaderFailOver();
+                }
+
+                // if registered readers equal to current parallelism, check 
whether all registered
+                // readers have same max attempt
+                if (context.registeredReadersOfAttempts().size() == 
context.currentParallelism()) {
+                    // Check if all readers have the same max attempt number
+                    Set<Integer> maxAttempts =
+                            
context.registeredReadersOfAttempts().values().stream()
+                                    .map(_readerByAttempt -> 
max(_readerByAttempt.keySet()))
+                                    .collect(Collectors.toSet());
+                    int globalMaxAttempt = max(maxAttempts);
+                    if (maxAttempts.size() == 1 && globalMaxAttempt >= 1) {
+                        LOG.info(
+                                "Failover completed. All {} subtasks reached 
the same attempt number {}. Current registered readers are {}",
+                                context.currentParallelism(),
+                                globalMaxAttempt,
+                                context.registeredReadersOfAttempts());

Review Comment:
   The method `registeredReadersOfAttempts()` is being called on `context`, but 
this method doesn't exist in Flink's standard `SplitEnumeratorContext` 
interface. This will cause a compilation error or runtime failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to