Copilot commented on code in PR #2510:
URL: https://github.com/apache/fluss/pull/2510#discussion_r2741306455


##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java:
##########
@@ -962,4 +1009,23 @@ private Map<Integer, Integer> putRows(TablePath 
tablePath, int rowsNum) throws E
         }
         return bucketRows;
     }
+
+    private static class MockBacklogSplitEnumeratorContext
+            extends MockSplitEnumeratorContext<SourceSplitBase> {
+
+        private volatile boolean isProcessingBacklogCalled;

Review Comment:
   The field name `isProcessingBacklogCalled` is misleading. It suggests it 
tracks whether a method was called, but it actually stores the state value 
passed to `setIsProcessingBacklog()`. This naming inconsistency could confuse 
developers.
   
   Consider renaming to `processingBacklogState` or `isProcessingBacklogValue` 
to better reflect that it stores the state value, not call tracking information.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java:
##########
@@ -962,4 +1009,23 @@ private Map<Integer, Integer> putRows(TablePath 
tablePath, int rowsNum) throws E
         }
         return bucketRows;
     }
+
+    private static class MockBacklogSplitEnumeratorContext
+            extends MockSplitEnumeratorContext<SourceSplitBase> {
+
+        private volatile boolean isProcessingBacklogCalled;
+
+        public MockBacklogSplitEnumeratorContext(int parallelism) {
+            super(parallelism);
+        }
+
+        @Override
+        public void setIsProcessingBacklog(boolean isProcessingBacklog) {
+            this.isProcessingBacklogCalled = isProcessingBacklog;
+        }
+
+        public boolean isProcessingBacklogCalled() {
+            return isProcessingBacklogCalled;
+        }

Review Comment:
   The method name `isProcessingBacklogCalled()` is misleading. It suggests it 
returns whether the method was called, but it actually returns the last value 
that was passed to `setIsProcessingBacklog()`. This naming could confuse 
developers maintaining this test.
   
   Consider renaming to `getIsProcessingBacklog()` or 
`getProcessingBacklogState()` to accurately reflect that it returns the current 
state value, not whether a method was called.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java:
##########
@@ -517,10 +581,34 @@ private FlinkRecordsWithSplitIds forBoundedSplitRecords(
                 flinkSourceReaderMetrics);
     }
 
+    private void sendBacklogFinishedEvent(Set<TableBucket> 
backlogFinishedTbls) {
+        if (backlogFinishedTbls.isEmpty() || context == null) {
+            String msg =
+                    backlogFinishedTbls.isEmpty()
+                            ? String.format(
+                                    "No table bucket finished backlog phase 
for tableId = %s",
+                                    table.getTableInfo().getTableId())
+                            : "context is null then no operator event could be 
sent ";
+            LOG.warn(msg);
+            return;
+        }

Review Comment:
   The warning message is misleading when `backlogFinishedTbls` is empty. This 
method is called multiple times during normal operation (e.g., from lines 196, 
546, etc.), and it's expected that the set might be empty in some cases. The 
warning suggests something is wrong, but empty sets are a valid scenario.
   
   Consider either removing the warning when the set is empty, or changing it 
to a debug-level log since this is normal behavior, not a warning condition.
   ```suggestion
           if (context == null) {
               LOG.warn("context is null then no operator event could be sent 
");
               return;
           }
   
           if (backlogFinishedTbls.isEmpty()) {
               if (LOG.isDebugEnabled()) {
                   LOG.debug(
                           "No table bucket finished backlog phase for tableId 
= {}",
                           table.getTableInfo().getTableId());
               }
               return;
           }
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java:
##########
@@ -759,6 +760,52 @@ void testPartitionsExpiredInFlussButExistInLake(
         }
     }
 
+    @Test
+    void testReportBacklogStatus() throws Throwable {

Review Comment:
   The test name `testReportBacklogStatus` doesn't accurately describe what 
it's testing. The test actually verifies that the enumerator correctly handles 
BacklogFinishEvent messages and updates the processing backlog status, which is 
more about event handling and state management rather than "reporting" status 
(which typically means publishing/exposing status to external systems).
   
   Consider renaming to something more descriptive like 
`testHandleBacklogFinishEventAndUpdateStatus` or 
`testBacklogStatusTransitionOnEventCompletion` to better reflect the actual 
test behavior.
   ```suggestion
       void testHandleBacklogFinishEventAndUpdateStatus() throws Throwable {
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java:
##########
@@ -159,12 +186,14 @@ public RecordsWithSplitIds<RecordAndPos> fetch() throws 
IOException {
                 return forBoundedSplitRecords(currentBoundedSplit, 
recordIterator);
             }
         } else {
+            sendBacklogFinishedEvent(onlySnapshotBuckets);
             // may need to finish empty log splits
             if (!emptyLogSplits.isEmpty()) {
                 FlinkRecordsWithSplitIds records =
                         new FlinkRecordsWithSplitIds(
                                 new HashSet<>(emptyLogSplits), 
flinkSourceReaderMetrics);
                 emptyLogSplits.clear();
+                sendBacklogFinishedEvent(subscribedBuckets.keySet());

Review Comment:
   Backlog finish events are being sent for all subscribed buckets when 
processing empty log splits, but this could include buckets that have backlog 
offsets and haven't reached them yet. This could cause the enumerator to 
prematurely mark backlog as finished for buckets that are still processing 
historical data.
   
   Consider filtering to only send events for buckets that either don't have a 
backlog marked offset or have already sent their backlog finish event. For 
example, filter `subscribedBuckets.keySet()` to exclude buckets present in 
`backlogMarkedOffsets` but not in `backlogEventSentTbls`.
   ```suggestion
   
                   // Only send backlog finish events for buckets that either 
don't have a
                   // backlog marked offset or have already sent their backlog 
finish event.
                   Set<TableBucket> bucketsToNotify = new 
HashSet<>(subscribedBuckets.keySet());
                   bucketsToNotify.removeIf(
                           bucket ->
                                   backlogMarkedOffsets.containsKey(bucket)
                                           && 
!backlogEventSentTbls.contains(bucket));
                   if (!bucketsToNotify.isEmpty()) {
                       sendBacklogFinishedEvent(bucketsToNotify);
                   }
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -307,6 +315,18 @@ public void start() {
         }
     }
 
+    private void initializeBacklog() {
+        context.setIsProcessingBacklog(true);
+        hasBacklogTbls.clear();
+        try {
+            recordBacklogBoundaryOffsets();
+        } catch (Exception e) {
+            throw new FlinkRuntimeException(
+                    String.format("Failed to record initial end offsets for 
table: %s", tablePath),
+                    ExceptionUtils.stripCompletionException(e));
+        }
+    }

Review Comment:
   When the table is empty or all buckets have no data (offset <= 0), 
`hasBacklogTbls` will be empty after `recordBacklogBoundaryOffsets()` 
completes. However, `isProcessingBacklog` is set to `true` at line 319 and 
remains true until a BacklogFinishEvent is received. For tables with no backlog 
data, `isProcessingBacklog` should be set to `false` immediately after checking 
backlog boundaries.
   
   Consider adding a check after `recordBacklogBoundaryOffsets()`: if 
`hasBacklogTbls.isEmpty()`, set `context.setIsProcessingBacklog(false)` 
immediately rather than waiting for events that may never come (or come from 
buckets that were never tracked).



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java:
##########
@@ -483,6 +542,11 @@ public String next() {
                         finishedSplits,
                         flinkSourceReaderMetrics);
         
stoppingOffsets.forEach(recordsWithSplitIds::setTableBucketStoppingOffset);
+        if (scanRecords.isEmpty()) {
+            sendBacklogFinishedEvent(subscribedBuckets.keySet());

Review Comment:
   Backlog finish events are being sent for all subscribed buckets when scan 
records are empty, but empty scan results can occur transiently (e.g., during a 
poll timeout) and don't necessarily mean all buckets have finished their 
backlog. This could cause the enumerator to prematurely mark backlog as 
finished for buckets that still have historical data to process but temporarily 
returned no records.
   
   Consider only sending backlog finish events for buckets that don't have a 
backlog marked offset tracked in `backlogMarkedOffsets`. Buckets with tracked 
backlog offsets should only send finish events when they reach their backlog 
boundary through the normal path (lines 507-510).
   ```suggestion
               // Only send backlog finished events for buckets that do not 
have a tracked
               // backlog offset. Buckets with tracked backlog offsets should 
only send
               // backlog finished events when they reach their backlog 
boundary through
               // the normal path (see handling around backlogFinishedTbl).
               Set<TableBucket> bucketsWithoutBacklogMark =
                       new HashSet<>(subscribedBuckets.keySet());
               
bucketsWithoutBacklogMark.removeAll(backlogMarkedOffsets.keySet());
               if (!bucketsWithoutBacklogMark.isEmpty()) {
                   sendBacklogFinishedEvent(bucketsWithoutBacklogMark);
               }
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java:
##########
@@ -759,6 +760,52 @@ void testPartitionsExpiredInFlussButExistInLake(
         }
     }
 
+    @Test
+    void testReportBacklogStatus() throws Throwable {
+        long tableId = createTable(DEFAULT_TABLE_PATH, 
DEFAULT_PK_TABLE_DESCRIPTOR);
+        int numSubtasks = 3;
+
+        try (MockBacklogSplitEnumeratorContext context =
+                new MockBacklogSplitEnumeratorContext(numSubtasks)) {
+            FlinkSourceEnumerator enumerator =
+                    new FlinkSourceEnumerator(
+                            DEFAULT_TABLE_PATH,
+                            flussConf,
+                            true,
+                            false,
+                            context,
+                            OffsetsInitializer.full(),
+                            DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
+                            streaming,
+                            null,
+                            null);
+            enumerator.start();
+            assertThat(context.isProcessingBacklogCalled()).isTrue();
+
+            // Register readers
+            for (int i = 0; i < numSubtasks; i++) {
+                registerReader(context, enumerator, i);
+            }
+
+            context.runNextOneTimeCallable();
+
+            // Simulate BacklogFinishEvent from reader
+            TableBucket bucket0 = new TableBucket(tableId, 0);
+            TableBucket bucket1 = new TableBucket(tableId, 1);
+            TableBucket bucket2 = new TableBucket(tableId, 2);
+
+            BacklogFinishEvent event0 = new BacklogFinishEvent(bucket0);
+            BacklogFinishEvent event1 = new BacklogFinishEvent(bucket1);
+            BacklogFinishEvent event2 = new BacklogFinishEvent(bucket2);
+
+            enumerator.handleSourceEvent(0, event0);
+            enumerator.handleSourceEvent(1, event1);
+            enumerator.handleSourceEvent(2, event2);
+
+            assertThat(context.isProcessingBacklogCalled()).isFalse();
+        }
+    }

Review Comment:
   This test creates an empty table (no data inserted) and expects to receive 
BacklogFinishEvents for all buckets. However, for an empty table, no buckets 
will have backlog offsets tracked in `hasBacklogTbls` (see line 1102 in 
FlinkSourceEnumerator - only buckets with offset > 0 are tracked). The test 
currently passes because the reader sends BacklogFinishEvents for all 
subscribed buckets in certain scenarios (lines 196, 546), but these are being 
flagged as bugs that incorrectly send events for buckets with active backlog.
   
   Consider either: (1) inserting data into the table so buckets have actual 
backlog to track, or (2) explicitly testing the empty-table scenario as a 
separate test case that verifies `isProcessingBacklog` is set to false 
immediately when no buckets have backlog.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to