Copilot commented on code in PR #2185: URL: https://github.com/apache/fluss/pull/2185#discussion_r2626793012
########## fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java: ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.source; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.tiering.TestingLakeTieringFactory; +import org.apache.fluss.flink.tiering.TestingWriteResult; +import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; +import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; +import org.apache.fluss.flink.utils.FlinkTestBase; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; + +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; +import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; + +import static org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET; +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; +import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link TieringSourceReader}. */ +class TieringSourceReaderTest extends FlinkTestBase { + + @Test + void testHandleTieringReachMaxDurationEvent1() throws Exception { Review Comment: The test method name 'testHandleTieringReachMaxDurationEvent1' has a '1' suffix, which typically indicates there should be multiple similar tests or that this is a placeholder name. Consider using a more descriptive name like 'testHandleTieringReachMaxDurationEventWithForceIgnore' or 'testMaxDurationEventWithEmptyAndNonEmptySplits' to better describe what the test verifies. ```suggestion void testHandleTieringReachMaxDurationEventWithForceIgnore() throws Exception { ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java: ########## @@ -64,18 +64,24 @@ public class TieringSplitReader<WriteResult> private static final Logger LOG = LoggerFactory.getLogger(TieringSplitReader.class); - private static final Duration POLL_TIMEOUT = Duration.ofMillis(10000L); + public static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(10_000L); // unknown bucket timestamp for empty split or snapshot split private static final long UNKNOWN_BUCKET_TIMESTAMP = -1; + private static final long UNKNOW_BUCKET_OFFSET = -1; + Review Comment: Typo in constant name: 'UNKNOW_BUCKET_OFFSET' should be 'UNKNOWN_BUCKET_OFFSET' to be consistent with UNKNOWN_BUCKET_TIMESTAMP. ```suggestion // unknown bucket offset for empty split or snapshot split private static final long UNKNOWN_BUCKET_OFFSET = -1; /** * @deprecated Use {@link #UNKNOWN_BUCKET_OFFSET} instead. */ @Deprecated private static final long UNKNOW_BUCKET_OFFSET = UNKNOWN_BUCKET_OFFSET; ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceOptions.java: ########## @@ -34,4 +34,21 @@ public class TieringSourceOptions { .defaultValue(Duration.ofSeconds(30)) .withDescription( "The fixed interval to request tiering table from Fluss cluster, by default 30 seconds."); + + public static final ConfigOption<Duration> TIERING_TABLE_DURATION_MAX = + key("tiering.table.duration.max") + .durationType() + .defaultValue(Duration.ofMinutes(10)) + .withDescription( + "The maximum duration for tiering a single table. If tiering a table exceeds this duration, " + + "it will be force completed: the tiering will be finalized and committed to the data lake " + + "(e.g., Paimon) immediately, even if they haven't reached their desired stopping offsets."); Review Comment: The description mentions that tiering will be "force completed" and "committed to the data lake (e.g., Paimon) immediately, even if they haven't reached their desired stopping offsets." However, the implementation shows that splits marked with forceIgnore are skipped entirely (TieringSplitReader lines 181-184), not force-completed. This discrepancy between the documentation and implementation could mislead users. Consider clarifying that timed-out splits may be either force-completed (if in progress) or skipped (if not yet started). ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java: ########## @@ -236,6 +277,53 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } } + private Set<Long> checkTableReachMaxTieringDuration() { + Set<Long> tieringReachMaxDurationTables = new HashSet<>(); + long currentTime = clock.milliseconds(); + for (Map.Entry<Long, Long> tieringTableDeadline : tieringTablesDeadline.entrySet()) { + long tableId = tieringTableDeadline.getKey(); + long deadline = tieringTableDeadline.getValue(); + if (deadline < currentTime) { + tieringReachMaxDurationTables.add(tableId); + } + } + return tieringReachMaxDurationTables; + } + + private void handleReachMaxTieringDurationTables( + Set<Long> tieringReachMaxDurationTables, Throwable throwable) { + if (throwable != null) { + LOG.error("Fail to check tiering timeout tables.", throwable); + return; + } + + for (Long reachMaxDurationTable : tieringReachMaxDurationTables) { + for (TieringSplit tieringSplit : pendingSplits) { + if (tieringSplit.getTableBucket().getTableId() == reachMaxDurationTable) { + // force ignore this tiering split since the tiering for this table is timeout, + // we have to force to set to ignore the tiering split so that the + // tiering source reader can ignore them directly + tieringSplit.forceIgnore(); + } else { + // we can break directly, if found any one split's table id is not equal to the + // timeout + // table, the following split must be not equal to the table id + break; Review Comment: The pendingSplits list is accessed both from the periodic timeout check (line 301) and the assignSplits method (line 350), which can be called from different threads via handleSplitRequest and addSplitsBack. While assignSplits synchronizes on readersAwaitingSplit, the timeout check doesn't use any synchronization, which could lead to a ConcurrentModificationException or inconsistent state when iterating and modifying pendingSplits concurrently. ```suggestion // Access to pendingSplits must be synchronized consistently with other paths // (e.g., assignSplits) to avoid concurrent modification. synchronized (readersAwaitingSplit) { for (TieringSplit tieringSplit : pendingSplits) { if (tieringSplit.getTableBucket().getTableId() == reachMaxDurationTable) { // force ignore this tiering split since the tiering for this table is // timeout, we have to force to set to ignore the tiering split so that the // tiering source reader can ignore them directly tieringSplit.forceIgnore(); } else { // we can break directly, if found any one split's table id is not equal to // the timeout table, the following split must be not equal to the table id break; } ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java: ########## @@ -236,6 +277,53 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } } + private Set<Long> checkTableReachMaxTieringDuration() { + Set<Long> tieringReachMaxDurationTables = new HashSet<>(); + long currentTime = clock.milliseconds(); + for (Map.Entry<Long, Long> tieringTableDeadline : tieringTablesDeadline.entrySet()) { + long tableId = tieringTableDeadline.getKey(); + long deadline = tieringTableDeadline.getValue(); + if (deadline < currentTime) { + tieringReachMaxDurationTables.add(tableId); + } + } + return tieringReachMaxDurationTables; + } + + private void handleReachMaxTieringDurationTables( + Set<Long> tieringReachMaxDurationTables, Throwable throwable) { + if (throwable != null) { + LOG.error("Fail to check tiering timeout tables.", throwable); + return; + } + + for (Long reachMaxDurationTable : tieringReachMaxDurationTables) { + for (TieringSplit tieringSplit : pendingSplits) { + if (tieringSplit.getTableBucket().getTableId() == reachMaxDurationTable) { + // force ignore this tiering split since the tiering for this table is timeout, + // we have to force to set to ignore the tiering split so that the + // tiering source reader can ignore them directly + tieringSplit.forceIgnore(); + } else { + // we can break directly, if found any one split's table id is not equal to the + // timeout + // table, the following split must be not equal to the table id + break; + } + } + + LOG.debug("Found the table {} reach max tiering duration.", reachMaxDurationTable); + + // broadcast the tiering reach max duration event to all readers, + // we broadcast all for simplicity + Set<Integer> readers = new HashSet<>(context.registeredReaders().keySet()); + for (int reader : readers) { + context.sendEventToSourceReader( + reader, new TieringReachMaxDurationEvent(reachMaxDurationTable)); + } Review Comment: The deadline is removed from tieringTablesDeadline when a table finishes or fails (lines 240 and 259), but it's never removed when the timeout event is sent at line 323. This means a timed-out table will continue triggering timeout checks and events on every periodic check interval, potentially sending duplicate TieringReachMaxDurationEvent messages to readers. Consider removing the deadline after sending the timeout event. ```suggestion } // remove the deadline once timeout is handled to avoid duplicate timeout events tieringTablesDeadline.remove(reachMaxDurationTable); ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java: ########## @@ -260,6 +349,11 @@ private void assignSplits() { if (!pendingSplits.isEmpty()) { TieringSplit tieringSplit = pendingSplits.remove(0); context.assignSplit(tieringSplit, nextAwaitingReader); + long tableId = tieringSplit.getTableBucket().getTableId(); + if (!tieringTablesDeadline.containsKey(tableId)) { + tieringTablesDeadline.put( + tableId, clock.milliseconds() + tieringTableDurationMaxMs); + } Review Comment: The deadline is set only when the first split of a table is assigned (lines 353-356). However, if a table has multiple buckets processed in parallel by different readers, each bucket's split will be assigned at different times, but they all share the same deadline. This means some splits may have significantly less time to complete than others. Consider setting the deadline when the table tiering starts (when splits are generated) rather than when splits are assigned to ensure all splits for a table have equal time to complete. ########## fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java: ########## @@ -760,4 +750,119 @@ private void verifyTieringSplitAssignment( assertThat(allTieringSplits) .allMatch(tieringSplit -> tieringSplit.getTablePath().equals(expectedTablePath)); } + + private TieringSourceEnumerator createDefaultTieringSourceEnumerator( + Configuration flussConf, MockSplitEnumeratorContext<TieringSplit> context) { + return new TieringSourceEnumerator( + flussConf, + context, + 500, + Duration.ofMinutes(10).toMillis(), + Duration.ofSeconds(10).toMillis()); + } + + private TieringSourceEnumerator createTieringSourceEnumeratorWithManualClock( + Configuration flussConf, + MockSplitEnumeratorContext<TieringSplit> context, + ManualClock clock, + long tieringTableDurationMaxMs, + long tieringTableDurationDetectIntervalMs) { + return new TieringSourceEnumerator( + flussConf, + context, + 500, + tieringTableDurationMaxMs, + tieringTableDurationDetectIntervalMs, + clock); + } + + /** + * Get events sent to readers from MockSplitEnumeratorContext using reflection. + * + * @param context the MockSplitEnumeratorContext + * @return map of reader ID to list of events sent to that reader + */ + @SuppressWarnings("unchecked") + private Map<Integer, List<SourceEvent>> getEventsToReaders( + MockSplitEnumeratorContext<TieringSplit> context) { + try { + Field eventsToReadersField = + MockSplitEnumeratorContext.class.getDeclaredField("eventsToReaders"); + eventsToReadersField.setAccessible(true); + return (Map<Integer, List<SourceEvent>>) eventsToReadersField.get(context); + } catch (NoSuchFieldException | IllegalAccessException e) { + // If reflection fails, return empty map - test will still verify splits behavior + return new HashMap<>(); + } + } + Review Comment: The reflection-based helper method getEventsToReaders (lines 780-797) is defined but never used. It appears to be superseded by the MockSplitEnumeratorContext.getSentSourceEvent() method called at line 845. Consider removing this unused code to reduce maintainability burden. ```suggestion ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java: ########## @@ -339,30 +417,29 @@ private TableBucketWriteResult<WriteResult> completeLakeWriter( checkNotNull(currentTableNumberOfSplits)); } - private TableBucketWriteResultWithSplitIds forEmptySplits(Set<TieringLogSplit> emptySplits) { + private TableBucketWriteResultWithSplitIds forEmptySplits(Set<TieringSplit> emptySplits) { Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = new HashMap<>(); Map<TableBucket, String> finishedSplitIds = new HashMap<>(); - for (TieringLogSplit logSplit : emptySplits) { - TableBucket tableBucket = logSplit.getTableBucket(); - finishedSplitIds.put(tableBucket, logSplit.splitId()); + for (TieringSplit tieringSplit : emptySplits) { + TableBucket tableBucket = tieringSplit.getTableBucket(); + finishedSplitIds.put(tableBucket, tieringSplit.splitId()); writeResults.put( tableBucket, toTableBucketWriteResult( - logSplit.getTablePath(), + tieringSplit.getTablePath(), tableBucket, - logSplit.getPartitionName(), + tieringSplit.getPartitionName(), null, - logSplit.getStoppingOffset(), + UNKNOW_BUCKET_OFFSET, Review Comment: When completing an empty split that was force-ignored, the logEndOffset is set to UNKNOW_BUCKET_OFFSET (-1). This could cause issues downstream when the offset is used for checkpointing or resuming tiering operations. Consider using a more appropriate value or documenting that -1 is a valid sentinel value for force-ignored splits. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java: ########## @@ -236,6 +277,53 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } } + private Set<Long> checkTableReachMaxTieringDuration() { + Set<Long> tieringReachMaxDurationTables = new HashSet<>(); + long currentTime = clock.milliseconds(); + for (Map.Entry<Long, Long> tieringTableDeadline : tieringTablesDeadline.entrySet()) { + long tableId = tieringTableDeadline.getKey(); + long deadline = tieringTableDeadline.getValue(); + if (deadline < currentTime) { + tieringReachMaxDurationTables.add(tableId); + } + } + return tieringReachMaxDurationTables; + } + + private void handleReachMaxTieringDurationTables( + Set<Long> tieringReachMaxDurationTables, Throwable throwable) { + if (throwable != null) { + LOG.error("Fail to check tiering timeout tables.", throwable); + return; + } + + for (Long reachMaxDurationTable : tieringReachMaxDurationTables) { + for (TieringSplit tieringSplit : pendingSplits) { + if (tieringSplit.getTableBucket().getTableId() == reachMaxDurationTable) { + // force ignore this tiering split since the tiering for this table is timeout, + // we have to force to set to ignore the tiering split so that the + // tiering source reader can ignore them directly + tieringSplit.forceIgnore(); + } else { + // we can break directly, if found any one split's table id is not equal to the + // timeout + // table, the following split must be not equal to the table id + break; Review Comment: The logic assumes pendingSplits are ordered by table ID and will break on the first mismatch. However, after introducing Collections.shuffle(tieringSplits) at line 423, this ordering assumption is no longer valid. Splits from different tables can be interleaved, so the break statement at line 311 will prevent marking all splits for the timeout table as forceIgnore. ```suggestion ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java: ########## @@ -101,7 +110,8 @@ public TieringSnapshotSplit copy(int numberOfSplits) { partitionName, snapshotId, logOffsetOfSnapshot, - numberOfSplits); + numberOfSplits, + forceIgnore); } Review Comment: The copy method preserves the forceIgnore state from the original split. However, this is inconsistent with TieringLogSplit.copy(), which does not preserve forceIgnore and always defaults it to false. This inconsistency could lead to unexpected behavior when different split types are copied. ```suggestion numberOfSplits); } ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java: ########## @@ -249,6 +337,7 @@ private void generateAndAssignSplits( private void assignSplits() { /* This method may be called from both addSplitsBack and handleSplitRequest, make it thread safe. */ + // todo: do we need to add lock? Review Comment: The TODO comment at line 340 asks "do we need to add lock?" but this question is now more critical given that the timeout checking code (lines 301-312) accesses pendingSplits without synchronization while assignSplits() synchronizes on readersAwaitingSplit. This creates a potential race condition. The TODO should either be resolved by adding proper synchronization or be updated to reflect the new concurrency concerns. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java: ########## @@ -43,11 +51,29 @@ public final class TieringSourceReader<WriteResult> private final Connection connection; public TieringSourceReader( + FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>> + elementsQueue, SourceReaderContext context, Connection connection, LakeTieringFactory<WriteResult, ?> lakeTieringFactory) { + this(elementsQueue, context, connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT); + } Review Comment: The constructor now requires an elementsQueue parameter, changing from creating the queue internally to accepting it externally. However, the non-test constructor at line 53 doesn't expose a way to customize the queue, which limits flexibility for testing or customization. While this is addressed by the @VisibleForTesting constructor, consider whether the production code path should also allow queue configuration or if it should remain internal. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java: ########## @@ -248,6 +278,44 @@ private void mayCreateLogScanner() { } } + private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> + forceCompleteTieringLogRecords() throws IOException { + Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = new HashMap<>(); + Map<TableBucket, String> finishedSplitIds = new HashMap<>(); + + Iterator<Map.Entry<TableBucket, TieringSplit>> currentTieringSplitsIterator = + currentTableSplitsByBucket.entrySet().iterator(); + while (currentTieringSplitsIterator.hasNext()) { + Map.Entry<TableBucket, TieringSplit> entry = currentTieringSplitsIterator.next(); + TableBucket bucket = entry.getKey(); + TieringSplit split = entry.getValue(); + if (split != null && split.isTieringLogSplit()) { + LogOffsetAndTimestamp logOffsetAndTimestamp = + currentTableTieredOffsetAndTimestamp.get(bucket); + long logEndOffset = + logOffsetAndTimestamp == null + ? UNKNOW_BUCKET_OFFSET Review Comment: Typo in variable reference: 'UNKNOW_BUCKET_OFFSET' should be 'UNKNOWN_BUCKET_OFFSET'. ```suggestion ? UNKNOWN_BUCKET_OFFSET ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringReachMaxDurationEvent.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.event; + +import org.apache.flink.api.connector.source.SourceEvent; + +import java.util.Objects; + +/** + * SourceEvent used to notify TieringSourceReader that a table has reached the maximum tiering + * duration and should be force completed. + */ +public class TieringReachMaxDurationEvent implements SourceEvent { Review Comment: Class TieringReachMaxDurationEvent overrides [hashCode](1) but not equals. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java: ########## @@ -339,30 +417,29 @@ private TableBucketWriteResult<WriteResult> completeLakeWriter( checkNotNull(currentTableNumberOfSplits)); } - private TableBucketWriteResultWithSplitIds forEmptySplits(Set<TieringLogSplit> emptySplits) { + private TableBucketWriteResultWithSplitIds forEmptySplits(Set<TieringSplit> emptySplits) { Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = new HashMap<>(); Map<TableBucket, String> finishedSplitIds = new HashMap<>(); - for (TieringLogSplit logSplit : emptySplits) { - TableBucket tableBucket = logSplit.getTableBucket(); - finishedSplitIds.put(tableBucket, logSplit.splitId()); + for (TieringSplit tieringSplit : emptySplits) { + TableBucket tableBucket = tieringSplit.getTableBucket(); + finishedSplitIds.put(tableBucket, tieringSplit.splitId()); writeResults.put( tableBucket, toTableBucketWriteResult( - logSplit.getTablePath(), + tieringSplit.getTablePath(), tableBucket, - logSplit.getPartitionName(), + tieringSplit.getPartitionName(), null, - logSplit.getStoppingOffset(), + UNKNOW_BUCKET_OFFSET, Review Comment: Typo in variable reference: 'UNKNOW_BUCKET_OFFSET' should be 'UNKNOWN_BUCKET_OFFSET'. ```suggestion UNKNOWN_BUCKET_OFFSET, ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java: ########## @@ -324,6 +418,9 @@ private void generateTieringSplits(Tuple3<Long, Long, TablePath> tieringTable) List<TieringSplit> tieringSplits = populateNumberOfTieringSplits( splitGenerator.generateTableSplits(tieringTable.f2)); + // shuffle tiering split to avoid splits tiering skew + // after introduce tiering max duration + Collections.shuffle(tieringSplits); Review Comment: Shuffling the tiering splits (line 423) aims to avoid splits tiering skew after introducing tiering max duration. However, this shuffle happens at the table level, meaning all splits for a table are shuffled together before being added to pendingSplits. Since pendingSplits is processed FIFO (line 350), and multiple tables may be tiering concurrently, this shuffle provides limited benefit for balancing work across readers when multiple tables are involved. Consider documenting this limitation or implementing a more sophisticated load balancing strategy. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java: ########## @@ -74,6 +74,9 @@ public byte[] serialize(TieringSplit split) throws IOException { out.writeBoolean(false); } + // write force ignore + out.writeBoolean(split.isForceIgnore()); Review Comment: The serialization format has changed by adding a new forceIgnore boolean field (line 78), but the version number remains at VERSION_0. This breaks backward compatibility: older deserializers will fail when reading data serialized by the new version, and the new deserializer will fail when reading old serialized data that doesn't have the forceIgnore field. Consider bumping the version to VERSION_1 and handling both old and new formats in the deserialize method for backward compatibility. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
