Copilot commented on code in PR #2920:
URL: https://github.com/apache/fluss/pull/2920#discussion_r3208604859


##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java:
##########
@@ -70,8 +70,18 @@ public RecordWriter(
 
     public abstract void write(LogRecord record) throws Exception;
 
+    /**
+     * Completes the write process and returns the commit message.
+     *
+     * @return the commit message, or null if no data was written (empty write 
scenario)
+     */
+    @Nullable
     CommitMessage complete() throws Exception {
         List<CommitMessage> commitMessages = tableWrite.prepareCommit();
+        if (commitMessages.isEmpty()) {
+            // No data was written, return null to indicate empty write
+            return null;
+        }

Review Comment:
   `RecordWriter.complete()` can now return null, but current Paimon tiering 
flow still wraps the returned `CommitMessage` into `PaimonWriteResult` (and its 
serializer/committer expects a non-null commit message). This will likely lead 
to NPEs during commit or serialization for empty writes. Consider propagating 
the empty-write case as a null *write result* (i.e., have the lake writer 
return null when `prepareCommit()` is empty) or make downstream handling 
explicitly accept/skip null commit messages.
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -357,6 +383,16 @@ private void generateAndAssignSplits(
         if (throwable != null) {
             LOG.warn("Failed to request tiering table, will retry later.", 
throwable);
         }
+
+        // Process dropped table IDs collected during heartbeat processing.
+        if (!pendingDroppedTableIds.isEmpty()) {
+            List<Long> droppedIds = new ArrayList<>(pendingDroppedTableIds);
+            pendingDroppedTableIds.clear();
+            for (long droppedTableId : droppedIds) {
+                handleTableDropped(droppedTableId);
+            }
+        }

Review Comment:
   `pendingDroppedTableIds` is populated inside 
`requestTieringTableSplitsViaHeartBeat()` which is executed via 
`context.callAsync(...)` (async thread), but here it is copied and cleared via 
multiple operations without synchronizing on the 
`Collections.synchronizedList`. This can cause ConcurrentModificationException 
or lost dropped-table IDs. Synchronize around the copy+clear (or replace with a 
concurrent queue/atomic swap) so processing is thread-safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to