Copilot commented on code in PR #2185:
URL: https://github.com/apache/fluss/pull/2185#discussion_r2626793012


##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.source;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.tiering.TestingLakeTieringFactory;
+import org.apache.fluss.flink.tiering.TestingWriteResult;
+import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
+import org.apache.fluss.flink.tiering.source.split.TieringLogSplit;
+import org.apache.fluss.flink.utils.FlinkTestBase;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static 
org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link TieringSourceReader}. */
+class TieringSourceReaderTest extends FlinkTestBase {
+
+    @Test
+    void testHandleTieringReachMaxDurationEvent1() throws Exception {

Review Comment:
   The test method name 'testHandleTieringReachMaxDurationEvent1' has a '1' 
suffix, which typically indicates there should be multiple similar tests or 
that this is a placeholder name. Consider using a more descriptive name like 
'testHandleTieringReachMaxDurationEventWithForceIgnore' or 
'testMaxDurationEventWithEmptyAndNonEmptySplits' to better describe what the 
test verifies.
   ```suggestion
       void testHandleTieringReachMaxDurationEventWithForceIgnore() throws 
Exception {
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -64,18 +64,24 @@ public class TieringSplitReader<WriteResult>
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TieringSplitReader.class);
 
-    private static final Duration POLL_TIMEOUT = Duration.ofMillis(10000L);
+    public static final Duration DEFAULT_POLL_TIMEOUT = 
Duration.ofMillis(10_000L);
 
     // unknown bucket timestamp for empty split or snapshot split
     private static final long UNKNOWN_BUCKET_TIMESTAMP = -1;
 
+    private static final long UNKNOW_BUCKET_OFFSET = -1;
+

Review Comment:
   Typo in constant name: 'UNKNOW_BUCKET_OFFSET' should be 
'UNKNOWN_BUCKET_OFFSET' to be consistent with UNKNOWN_BUCKET_TIMESTAMP.
   ```suggestion
       // unknown bucket offset for empty split or snapshot split
       private static final long UNKNOWN_BUCKET_OFFSET = -1;
   
       /**
        * @deprecated Use {@link #UNKNOWN_BUCKET_OFFSET} instead.
        */
       @Deprecated
       private static final long UNKNOW_BUCKET_OFFSET = UNKNOWN_BUCKET_OFFSET;
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceOptions.java:
##########
@@ -34,4 +34,21 @@ public class TieringSourceOptions {
                     .defaultValue(Duration.ofSeconds(30))
                     .withDescription(
                             "The fixed interval to request tiering table from 
Fluss cluster, by default 30 seconds.");
+
+    public static final ConfigOption<Duration> TIERING_TABLE_DURATION_MAX =
+            key("tiering.table.duration.max")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(10))
+                    .withDescription(
+                            "The maximum duration for tiering a single table. 
If tiering a table exceeds this duration, "
+                                    + "it will be force completed: the tiering 
will be finalized and committed to the data lake "
+                                    + "(e.g., Paimon) immediately, even if 
they haven't reached their desired stopping offsets.");

Review Comment:
   The description mentions that tiering will be "force completed" and 
"committed to the data lake (e.g., Paimon) immediately, even if they haven't 
reached their desired stopping offsets." However, the implementation shows that 
splits marked with forceIgnore are skipped entirely (TieringSplitReader lines 
181-184), not force-completed. This discrepancy between the documentation and 
implementation could mislead users. Consider clarifying that timed-out splits 
may be either force-completed (if in progress) or skipped (if not yet started).



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -236,6 +277,53 @@ public void handleSourceEvent(int subtaskId, SourceEvent 
sourceEvent) {
         }
     }
 
+    private Set<Long> checkTableReachMaxTieringDuration() {
+        Set<Long> tieringReachMaxDurationTables = new HashSet<>();
+        long currentTime = clock.milliseconds();
+        for (Map.Entry<Long, Long> tieringTableDeadline : 
tieringTablesDeadline.entrySet()) {
+            long tableId = tieringTableDeadline.getKey();
+            long deadline = tieringTableDeadline.getValue();
+            if (deadline < currentTime) {
+                tieringReachMaxDurationTables.add(tableId);
+            }
+        }
+        return tieringReachMaxDurationTables;
+    }
+
+    private void handleReachMaxTieringDurationTables(
+            Set<Long> tieringReachMaxDurationTables, Throwable throwable) {
+        if (throwable != null) {
+            LOG.error("Fail to check tiering timeout tables.", throwable);
+            return;
+        }
+
+        for (Long reachMaxDurationTable : tieringReachMaxDurationTables) {
+            for (TieringSplit tieringSplit : pendingSplits) {
+                if (tieringSplit.getTableBucket().getTableId() == 
reachMaxDurationTable) {
+                    // force ignore this tiering split since the tiering for 
this table is timeout,
+                    // we have to force to set to ignore the tiering split so 
that the
+                    // tiering source reader can ignore them directly
+                    tieringSplit.forceIgnore();
+                } else {
+                    // we can break directly, if found any one split's table 
id is not equal to the
+                    // timeout
+                    // table, the following split must be not equal to the 
table id
+                    break;

Review Comment:
   The pendingSplits list is accessed both from the periodic timeout check 
(line 301) and the assignSplits method (line 350), which can be called from 
different threads via handleSplitRequest and addSplitsBack. While assignSplits 
synchronizes on readersAwaitingSplit, the timeout check doesn't use any 
synchronization, which could lead to a ConcurrentModificationException or 
inconsistent state when iterating and modifying pendingSplits concurrently.
   ```suggestion
               // Access to pendingSplits must be synchronized consistently 
with other paths
               // (e.g., assignSplits) to avoid concurrent modification.
               synchronized (readersAwaitingSplit) {
                   for (TieringSplit tieringSplit : pendingSplits) {
                       if (tieringSplit.getTableBucket().getTableId() == 
reachMaxDurationTable) {
                           // force ignore this tiering split since the tiering 
for this table is
                           // timeout, we have to force to set to ignore the 
tiering split so that the
                           // tiering source reader can ignore them directly
                           tieringSplit.forceIgnore();
                       } else {
                           // we can break directly, if found any one split's 
table id is not equal to
                           // the timeout table, the following split must be 
not equal to the table id
                           break;
                       }
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -236,6 +277,53 @@ public void handleSourceEvent(int subtaskId, SourceEvent 
sourceEvent) {
         }
     }
 
+    private Set<Long> checkTableReachMaxTieringDuration() {
+        Set<Long> tieringReachMaxDurationTables = new HashSet<>();
+        long currentTime = clock.milliseconds();
+        for (Map.Entry<Long, Long> tieringTableDeadline : 
tieringTablesDeadline.entrySet()) {
+            long tableId = tieringTableDeadline.getKey();
+            long deadline = tieringTableDeadline.getValue();
+            if (deadline < currentTime) {
+                tieringReachMaxDurationTables.add(tableId);
+            }
+        }
+        return tieringReachMaxDurationTables;
+    }
+
+    private void handleReachMaxTieringDurationTables(
+            Set<Long> tieringReachMaxDurationTables, Throwable throwable) {
+        if (throwable != null) {
+            LOG.error("Fail to check tiering timeout tables.", throwable);
+            return;
+        }
+
+        for (Long reachMaxDurationTable : tieringReachMaxDurationTables) {
+            for (TieringSplit tieringSplit : pendingSplits) {
+                if (tieringSplit.getTableBucket().getTableId() == 
reachMaxDurationTable) {
+                    // force ignore this tiering split since the tiering for 
this table is timeout,
+                    // we have to force to set to ignore the tiering split so 
that the
+                    // tiering source reader can ignore them directly
+                    tieringSplit.forceIgnore();
+                } else {
+                    // we can break directly, if found any one split's table 
id is not equal to the
+                    // timeout
+                    // table, the following split must be not equal to the 
table id
+                    break;
+                }
+            }
+
+            LOG.debug("Found the table {} reach max tiering duration.", 
reachMaxDurationTable);
+
+            // broadcast the tiering reach max duration event to all readers,
+            // we broadcast all for simplicity
+            Set<Integer> readers = new 
HashSet<>(context.registeredReaders().keySet());
+            for (int reader : readers) {
+                context.sendEventToSourceReader(
+                        reader, new 
TieringReachMaxDurationEvent(reachMaxDurationTable));
+            }

Review Comment:
   The deadline is removed from tieringTablesDeadline when a table finishes or 
fails (lines 240 and 259), but it's never removed when the timeout event is 
sent at line 323. This means a timed-out table will continue triggering timeout 
checks and events on every periodic check interval, potentially sending 
duplicate TieringReachMaxDurationEvent messages to readers. Consider removing 
the deadline after sending the timeout event.
   ```suggestion
               }
   
               // remove the deadline once timeout is handled to avoid 
duplicate timeout events
               tieringTablesDeadline.remove(reachMaxDurationTable);
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -260,6 +349,11 @@ private void assignSplits() {
                     if (!pendingSplits.isEmpty()) {
                         TieringSplit tieringSplit = pendingSplits.remove(0);
                         context.assignSplit(tieringSplit, nextAwaitingReader);
+                        long tableId = 
tieringSplit.getTableBucket().getTableId();
+                        if (!tieringTablesDeadline.containsKey(tableId)) {
+                            tieringTablesDeadline.put(
+                                    tableId, clock.milliseconds() + 
tieringTableDurationMaxMs);
+                        }

Review Comment:
   The deadline is set only when the first split of a table is assigned (lines 
353-356). However, if a table has multiple buckets processed in parallel by 
different readers, each bucket's split will be assigned at different times, but 
they all share the same deadline. This means some splits may have significantly 
less time to complete than others. Consider setting the deadline when the table 
tiering starts (when splits are generated) rather than when splits are assigned 
to ensure all splits for a table have equal time to complete.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java:
##########
@@ -760,4 +750,119 @@ private void verifyTieringSplitAssignment(
         assertThat(allTieringSplits)
                 .allMatch(tieringSplit -> 
tieringSplit.getTablePath().equals(expectedTablePath));
     }
+
+    private TieringSourceEnumerator createDefaultTieringSourceEnumerator(
+            Configuration flussConf, MockSplitEnumeratorContext<TieringSplit> 
context) {
+        return new TieringSourceEnumerator(
+                flussConf,
+                context,
+                500,
+                Duration.ofMinutes(10).toMillis(),
+                Duration.ofSeconds(10).toMillis());
+    }
+
+    private TieringSourceEnumerator 
createTieringSourceEnumeratorWithManualClock(
+            Configuration flussConf,
+            MockSplitEnumeratorContext<TieringSplit> context,
+            ManualClock clock,
+            long tieringTableDurationMaxMs,
+            long tieringTableDurationDetectIntervalMs) {
+        return new TieringSourceEnumerator(
+                flussConf,
+                context,
+                500,
+                tieringTableDurationMaxMs,
+                tieringTableDurationDetectIntervalMs,
+                clock);
+    }
+
+    /**
+     * Get events sent to readers from MockSplitEnumeratorContext using 
reflection.
+     *
+     * @param context the MockSplitEnumeratorContext
+     * @return map of reader ID to list of events sent to that reader
+     */
+    @SuppressWarnings("unchecked")
+    private Map<Integer, List<SourceEvent>> getEventsToReaders(
+            MockSplitEnumeratorContext<TieringSplit> context) {
+        try {
+            Field eventsToReadersField =
+                    
MockSplitEnumeratorContext.class.getDeclaredField("eventsToReaders");
+            eventsToReadersField.setAccessible(true);
+            return (Map<Integer, List<SourceEvent>>) 
eventsToReadersField.get(context);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            // If reflection fails, return empty map - test will still verify 
splits behavior
+            return new HashMap<>();
+        }
+    }
+

Review Comment:
   The reflection-based helper method getEventsToReaders (lines 780-797) is 
defined but never used. It appears to be superseded by the 
MockSplitEnumeratorContext.getSentSourceEvent() method called at line 845. 
Consider removing this unused code to reduce maintainability burden.
   ```suggestion
   
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -339,30 +417,29 @@ private TableBucketWriteResult<WriteResult> 
completeLakeWriter(
                 checkNotNull(currentTableNumberOfSplits));
     }
 
-    private TableBucketWriteResultWithSplitIds 
forEmptySplits(Set<TieringLogSplit> emptySplits) {
+    private TableBucketWriteResultWithSplitIds 
forEmptySplits(Set<TieringSplit> emptySplits) {
         Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = 
new HashMap<>();
         Map<TableBucket, String> finishedSplitIds = new HashMap<>();
-        for (TieringLogSplit logSplit : emptySplits) {
-            TableBucket tableBucket = logSplit.getTableBucket();
-            finishedSplitIds.put(tableBucket, logSplit.splitId());
+        for (TieringSplit tieringSplit : emptySplits) {
+            TableBucket tableBucket = tieringSplit.getTableBucket();
+            finishedSplitIds.put(tableBucket, tieringSplit.splitId());
             writeResults.put(
                     tableBucket,
                     toTableBucketWriteResult(
-                            logSplit.getTablePath(),
+                            tieringSplit.getTablePath(),
                             tableBucket,
-                            logSplit.getPartitionName(),
+                            tieringSplit.getPartitionName(),
                             null,
-                            logSplit.getStoppingOffset(),
+                            UNKNOW_BUCKET_OFFSET,

Review Comment:
   When completing an empty split that was force-ignored, the logEndOffset is 
set to UNKNOW_BUCKET_OFFSET (-1). This could cause issues downstream when the 
offset is used for checkpointing or resuming tiering operations. Consider using 
a more appropriate value or documenting that -1 is a valid sentinel value for 
force-ignored splits.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -236,6 +277,53 @@ public void handleSourceEvent(int subtaskId, SourceEvent 
sourceEvent) {
         }
     }
 
+    private Set<Long> checkTableReachMaxTieringDuration() {
+        Set<Long> tieringReachMaxDurationTables = new HashSet<>();
+        long currentTime = clock.milliseconds();
+        for (Map.Entry<Long, Long> tieringTableDeadline : 
tieringTablesDeadline.entrySet()) {
+            long tableId = tieringTableDeadline.getKey();
+            long deadline = tieringTableDeadline.getValue();
+            if (deadline < currentTime) {
+                tieringReachMaxDurationTables.add(tableId);
+            }
+        }
+        return tieringReachMaxDurationTables;
+    }
+
+    private void handleReachMaxTieringDurationTables(
+            Set<Long> tieringReachMaxDurationTables, Throwable throwable) {
+        if (throwable != null) {
+            LOG.error("Fail to check tiering timeout tables.", throwable);
+            return;
+        }
+
+        for (Long reachMaxDurationTable : tieringReachMaxDurationTables) {
+            for (TieringSplit tieringSplit : pendingSplits) {
+                if (tieringSplit.getTableBucket().getTableId() == 
reachMaxDurationTable) {
+                    // force ignore this tiering split since the tiering for 
this table is timeout,
+                    // we have to force to set to ignore the tiering split so 
that the
+                    // tiering source reader can ignore them directly
+                    tieringSplit.forceIgnore();
+                } else {
+                    // we can break directly, if found any one split's table 
id is not equal to the
+                    // timeout
+                    // table, the following split must be not equal to the 
table id
+                    break;

Review Comment:
   The logic assumes pendingSplits are ordered by table ID and will break on 
the first mismatch. However, after introducing 
Collections.shuffle(tieringSplits) at line 423, this ordering assumption is no 
longer valid. Splits from different tables can be interleaved, so the break 
statement at line 311 will prevent marking all splits for the timeout table as 
forceIgnore.
   ```suggestion
   
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java:
##########
@@ -101,7 +110,8 @@ public TieringSnapshotSplit copy(int numberOfSplits) {
                 partitionName,
                 snapshotId,
                 logOffsetOfSnapshot,
-                numberOfSplits);
+                numberOfSplits,
+                forceIgnore);
     }
 

Review Comment:
   The copy method preserves the forceIgnore state from the original split. 
However, this is inconsistent with TieringLogSplit.copy(), which does not 
preserve forceIgnore and always defaults it to false. This inconsistency could 
lead to unexpected behavior when different split types are copied.
   ```suggestion
                   numberOfSplits);
       }
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -249,6 +337,7 @@ private void generateAndAssignSplits(
 
     private void assignSplits() {
         /* This method may be called from both addSplitsBack and 
handleSplitRequest, make it thread safe. */
+        // todo: do we need to add lock?

Review Comment:
   The TODO comment at line 340 asks "do we need to add lock?" but this 
question is now more critical given that the timeout checking code (lines 
301-312) accesses pendingSplits without synchronization while assignSplits() 
synchronizes on readersAwaitingSplit. This creates a potential race condition. 
The TODO should either be resolved by adding proper synchronization or be 
updated to reflect the new concurrency concerns.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java:
##########
@@ -43,11 +51,29 @@ public final class TieringSourceReader<WriteResult>
     private final Connection connection;
 
     public TieringSourceReader(
+            
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
+                    elementsQueue,
             SourceReaderContext context,
             Connection connection,
             LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
+        this(elementsQueue, context, connection, lakeTieringFactory, 
DEFAULT_POLL_TIMEOUT);
+    }

Review Comment:
   The constructor now requires an elementsQueue parameter, changing from 
creating the queue internally to accepting it externally. However, the non-test 
constructor at line 53 doesn't expose a way to customize the queue, which 
limits flexibility for testing or customization. While this is addressed by the 
@VisibleForTesting constructor, consider whether the production code path 
should also allow queue configuration or if it should remain internal.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -248,6 +278,44 @@ private void mayCreateLogScanner() {
         }
     }
 
+    private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>
+            forceCompleteTieringLogRecords() throws IOException {
+        Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = 
new HashMap<>();
+        Map<TableBucket, String> finishedSplitIds = new HashMap<>();
+
+        Iterator<Map.Entry<TableBucket, TieringSplit>> 
currentTieringSplitsIterator =
+                currentTableSplitsByBucket.entrySet().iterator();
+        while (currentTieringSplitsIterator.hasNext()) {
+            Map.Entry<TableBucket, TieringSplit> entry = 
currentTieringSplitsIterator.next();
+            TableBucket bucket = entry.getKey();
+            TieringSplit split = entry.getValue();
+            if (split != null && split.isTieringLogSplit()) {
+                LogOffsetAndTimestamp logOffsetAndTimestamp =
+                        currentTableTieredOffsetAndTimestamp.get(bucket);
+                long logEndOffset =
+                        logOffsetAndTimestamp == null
+                                ? UNKNOW_BUCKET_OFFSET

Review Comment:
   Typo in variable reference: 'UNKNOW_BUCKET_OFFSET' should be 
'UNKNOWN_BUCKET_OFFSET'.
   ```suggestion
                                   ? UNKNOWN_BUCKET_OFFSET
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringReachMaxDurationEvent.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.event;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+import java.util.Objects;
+
+/**
+ * SourceEvent used to notify TieringSourceReader that a table has reached the 
maximum tiering
+ * duration and should be force completed.
+ */
+public class TieringReachMaxDurationEvent implements SourceEvent {

Review Comment:
   Class TieringReachMaxDurationEvent overrides [hashCode](1) but not equals.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -339,30 +417,29 @@ private TableBucketWriteResult<WriteResult> 
completeLakeWriter(
                 checkNotNull(currentTableNumberOfSplits));
     }
 
-    private TableBucketWriteResultWithSplitIds 
forEmptySplits(Set<TieringLogSplit> emptySplits) {
+    private TableBucketWriteResultWithSplitIds 
forEmptySplits(Set<TieringSplit> emptySplits) {
         Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = 
new HashMap<>();
         Map<TableBucket, String> finishedSplitIds = new HashMap<>();
-        for (TieringLogSplit logSplit : emptySplits) {
-            TableBucket tableBucket = logSplit.getTableBucket();
-            finishedSplitIds.put(tableBucket, logSplit.splitId());
+        for (TieringSplit tieringSplit : emptySplits) {
+            TableBucket tableBucket = tieringSplit.getTableBucket();
+            finishedSplitIds.put(tableBucket, tieringSplit.splitId());
             writeResults.put(
                     tableBucket,
                     toTableBucketWriteResult(
-                            logSplit.getTablePath(),
+                            tieringSplit.getTablePath(),
                             tableBucket,
-                            logSplit.getPartitionName(),
+                            tieringSplit.getPartitionName(),
                             null,
-                            logSplit.getStoppingOffset(),
+                            UNKNOW_BUCKET_OFFSET,

Review Comment:
   Typo in variable reference: 'UNKNOW_BUCKET_OFFSET' should be 
'UNKNOWN_BUCKET_OFFSET'.
   ```suggestion
                               UNKNOWN_BUCKET_OFFSET,
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -324,6 +418,9 @@ private void generateTieringSplits(Tuple3<Long, Long, 
TablePath> tieringTable)
             List<TieringSplit> tieringSplits =
                     populateNumberOfTieringSplits(
                             
splitGenerator.generateTableSplits(tieringTable.f2));
+            // shuffle tiering split to avoid splits tiering skew
+            // after introduce tiering max duration
+            Collections.shuffle(tieringSplits);

Review Comment:
   Shuffling the tiering splits (line 423) aims to avoid splits tiering skew 
after introducing tiering max duration. However, this shuffle happens at the 
table level, meaning all splits for a table are shuffled together before being 
added to pendingSplits. Since pendingSplits is processed FIFO (line 350), and 
multiple tables may be tiering concurrently, this shuffle provides limited 
benefit for balancing work across readers when multiple tables are involved. 
Consider documenting this limitation or implementing a more sophisticated load 
balancing strategy.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java:
##########
@@ -74,6 +74,9 @@ public byte[] serialize(TieringSplit split) throws 
IOException {
             out.writeBoolean(false);
         }
 
+        // write force ignore
+        out.writeBoolean(split.isForceIgnore());

Review Comment:
   The serialization format has changed by adding a new forceIgnore boolean 
field (line 78), but the version number remains at VERSION_0. This breaks 
backward compatibility: older deserializers will fail when reading data 
serialized by the new version, and the new deserializer will fail when reading 
old serialized data that doesn't have the forceIgnore field. Consider bumping 
the version to VERSION_1 and handling both old and new formats in the 
deserialize method for backward compatibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to