Copilot commented on code in PR #2920:
URL: https://github.com/apache/fluss/pull/2920#discussion_r3208604859
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java:
##########
@@ -70,8 +70,18 @@ public RecordWriter(
public abstract void write(LogRecord record) throws Exception;
+ /**
+ * Completes the write process and returns the commit message.
+ *
+ * @return the commit message, or null if no data was written (empty write
scenario)
+ */
+ @Nullable
CommitMessage complete() throws Exception {
List<CommitMessage> commitMessages = tableWrite.prepareCommit();
+ if (commitMessages.isEmpty()) {
+ // No data was written, return null to indicate empty write
+ return null;
+ }
Review Comment:
`RecordWriter.complete()` can now return null, but current Paimon tiering
flow still wraps the returned `CommitMessage` into `PaimonWriteResult` (and its
serializer/committer expects a non-null commit message). This will likely lead
to NPEs during commit or serialization for empty writes. Consider propagating
the empty-write case as a null *write result* (i.e., have the lake writer
return null when `prepareCommit()` is empty) or make downstream handling
explicitly accept/skip null commit messages.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -357,6 +383,16 @@ private void generateAndAssignSplits(
if (throwable != null) {
LOG.warn("Failed to request tiering table, will retry later.",
throwable);
}
+
+ // Process dropped table IDs collected during heartbeat processing.
+ if (!pendingDroppedTableIds.isEmpty()) {
+ List<Long> droppedIds = new ArrayList<>(pendingDroppedTableIds);
+ pendingDroppedTableIds.clear();
+ for (long droppedTableId : droppedIds) {
+ handleTableDropped(droppedTableId);
+ }
+ }
Review Comment:
`pendingDroppedTableIds` is populated inside
`requestTieringTableSplitsViaHeartBeat()` which is executed via
`context.callAsync(...)` (async thread), but here it is copied and cleared via
multiple operations without synchronizing on the
`Collections.synchronizedList`. This can cause ConcurrentModificationException
or lost dropped-table IDs. Synchronize around the copy+clear (or replace with a
concurrent queue/atomic swap) so processing is thread-safe.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]