Copilot commented on code in PR #2463:
URL: https://github.com/apache/fluss/pull/2463#discussion_r2739658648
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java:
##########
@@ -594,6 +596,22 @@ void testHandleFailedTieringTableEvent() throws Throwable {
context.getSplitsAssignmentSequence().clear();
enumerator.handleSourceEvent(1, new FailedTieringEvent(tableId,
"test_reason"));
+ // verify failure event is broadcast to other readers
+ Map<Integer, List<SourceEvent>> sentSourceEvents =
context.getSentSourceEvent();
+ assertThat(sentSourceEvents).hasSize(numSubtasks - 1);
+ for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
+ if (subtaskId == 1) {
+ assertThat(sentSourceEvents).doesNotContainKey(subtaskId);
+ continue;
+ }
Review Comment:
This test asserts the failure event is *not* sent to the originating
subtask. In production, `FailedTieringEvent` may be emitted by the committer
(subtask 0) and still needs to be delivered to reader 0; otherwise that reader
won’t clear state / emit markers. Adjust the expectation to broadcast to all
readers (or otherwise ensure the excluded reader is still notified).
```suggestion
// verify failure event is broadcast to all readers (including
the originating one)
Map<Integer, List<SourceEvent>> sentSourceEvents =
context.getSentSourceEvent();
assertThat(sentSourceEvents).hasSize(numSubtasks);
for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -270,6 +274,30 @@ public void handleSourceEvent(int subtaskId, SourceEvent
sourceEvent) {
}
}
+ /**
+ * Broadcasts a TableTieringFailedEvent to all readers except the source
reader that reported
+ * the failure. This allows other readers to clean up their state for the
failed table.
+ *
+ * @param sourceSubtaskId the subtask ID of the reader that reported the
failure
+ * @param failedTableId the ID of the failed table
+ * @param failReason the reason for the failure
+ */
+ private void broadcastTableTieringFailedEvent(
+ int sourceSubtaskId, long failedTableId, String failReason) {
+ TableTieringFailedEvent failedEvent =
+ new TableTieringFailedEvent(failedTableId, failReason);
+
+ for (Integer readerSubtaskId : context.registeredReaders().keySet()) {
+ if (readerSubtaskId != sourceSubtaskId) {
Review Comment:
Broadcast currently skips `sourceSubtaskId`. `FailedTieringEvent` can
originate from the committer (parallelism=1, subtask 0) or other operators, so
this exclusion can accidentally prevent reader 0 (or any matching subtask) from
receiving the failure notification and cleaning up. Broadcast to all registered
readers (idempotent handling) rather than excluding the sender.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -441,6 +470,133 @@ private void finishCurrentTable() throws IOException {
currentTableSplitsByBucket.clear();
}
+ /** Cleans up the state for tables that have been marked as failed by the
Enumerator. */
+ private void cleanUpFailedTables() {
+ if (failedTableIds.isEmpty()) {
+ return;
+ }
+
+ // clean up pending splits for failed tables
+ for (Long failedTableId : failedTableIds) {
+ pendingTieringSplits.remove(failedTableId);
+ pendingTieringTables.remove(failedTableId);
+
Review Comment:
`cleanUpFailedTables()` removes pending splits/tables from the SplitReader’s
internal queues, but it does not report those splits as finished via
`RecordsWithSplitIds.finishedSplits()`. That can leave the SourceReader base
still considering those splits assigned/in-progress, preventing
`onSplitFinished()` and subsequent `sendSplitRequest()` from being triggered.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -441,6 +470,133 @@ private void finishCurrentTable() throws IOException {
currentTableSplitsByBucket.clear();
}
+ /** Cleans up the state for tables that have been marked as failed by the
Enumerator. */
+ private void cleanUpFailedTables() {
+ if (failedTableIds.isEmpty()) {
+ return;
+ }
+
+ // clean up pending splits for failed tables
+ for (Long failedTableId : failedTableIds) {
+ pendingTieringSplits.remove(failedTableId);
+ pendingTieringTables.remove(failedTableId);
+
+ // if the failed table is the current table being processed, clean
it up
+ if (currentTableId != null &&
currentTableId.equals(failedTableId)) {
+ LOG.info(
+ "Cleaning up current table {} (id={}) as it has been
marked as failed.",
+ currentTablePath,
+ currentTableId);
+ try {
+ cleanupCurrentTableState();
+ } catch (Exception e) {
+ LOG.warn(
+ "Error while cleaning up failed table {} state:
{}",
+ failedTableId,
+ e.getMessage());
+ }
+ }
+ }
+ failedTableIds.clear();
+ }
+
+ public void notifyTableTieringFailed(long tableId) {
+ LOG.info("Received notification that table {} tiering has failed.",
tableId);
+ failedTableIds.add(tableId);
+ }
Review Comment:
`failedTableIds` is a plain `HashSet` but is mutated via
`notifyTableTieringFailed()` (called from the SourceReader thread) and
read/cleared in `fetch()` (SplitFetcher thread). This is not thread-safe and
can drop notifications or throw concurrent modification issues. Use a
thread-safe structure (e.g., concurrent set/queue) and avoid `clear()` races
(drain/poll semantics).
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -441,6 +470,133 @@ private void finishCurrentTable() throws IOException {
currentTableSplitsByBucket.clear();
}
+ /** Cleans up the state for tables that have been marked as failed by the
Enumerator. */
+ private void cleanUpFailedTables() {
+ if (failedTableIds.isEmpty()) {
+ return;
+ }
+
+ // clean up pending splits for failed tables
+ for (Long failedTableId : failedTableIds) {
+ pendingTieringSplits.remove(failedTableId);
+ pendingTieringTables.remove(failedTableId);
+
+ // if the failed table is the current table being processed, clean
it up
+ if (currentTableId != null &&
currentTableId.equals(failedTableId)) {
+ LOG.info(
+ "Cleaning up current table {} (id={}) as it has been
marked as failed.",
+ currentTablePath,
+ currentTableId);
+ try {
+ cleanupCurrentTableState();
+ } catch (Exception e) {
+ LOG.warn(
+ "Error while cleaning up failed table {} state:
{}",
+ failedTableId,
+ e.getMessage());
+ }
+ }
+ }
+ failedTableIds.clear();
+ }
+
+ public void notifyTableTieringFailed(long tableId) {
+ LOG.info("Received notification that table {} tiering has failed.",
tableId);
+ failedTableIds.add(tableId);
+ }
+
+ /**
+ * Handles an exception that occurred during table tiering.
+ *
+ * @param e the exception that occurred
+ * @return an empty result to indicate no records are available for this
fetch cycle
+ */
+ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>
handleTableTieringException(
+ Exception e) {
+ if (currentTableId == null) {
+ // no current table, re-throw the exception
+ LOG.error("Exception occurred but no current table is being
processed.", e);
+ throw new RuntimeException(e);
+ }
+
+ LOG.error(
+ "Tiering failed for table {} (id={}). Error: {}. "
+ + "The table will be marked as failed and skipped.",
+ currentTablePath,
+ currentTableId,
+ e.getMessage(),
+ e);
+
+ // store the failure info in the queue
+ long failedTableId = currentTableId;
+ String failReason = ExceptionUtils.stringifyException(e);
+ failedTableQueue.offer(new FailedTableInfo(failedTableId, failReason));
Review Comment:
Method handleTableTieringException ignores exceptional return value of
Queue<FailedTableInfo>.offer.
```suggestion
boolean enqueued = failedTableQueue.offer(new
FailedTableInfo(failedTableId, failReason));
if (!enqueued) {
LOG.warn(
"Failed to enqueue failed table info for table {}
(id={}). "
+ "The failure reason will not be tracked in
failedTableQueue.",
currentTablePath,
failedTableId);
}
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java:
##########
@@ -145,4 +167,64 @@ public TableBucketWriteResult<WriteResult> deserialize(int
version, byte[] seria
maxTimestamp,
numberOfWriteResults);
}
+
+ private TableBucketWriteResult<WriteResult> deserializeV2(byte[]
serialized)
+ throws IOException {
+ final DataInputDeserializer in = new DataInputDeserializer(serialized);
+
+ // read failed marker flag
+ boolean isFailedMarker = in.readBoolean();
+
+ if (isFailedMarker) {
+ // deserialize failed marker
+ long tableId = in.readLong();
+ String failReason = null;
+ if (in.readBoolean()) {
+ failReason = in.readUTF();
+ }
+ return TableBucketWriteResult.failedMarker(tableId, failReason);
+ } else {
+ // deserialize table path
+ String databaseName = in.readUTF();
+ String tableName = in.readUTF();
+ TablePath tablePath = new TablePath(databaseName, tableName);
+
+ // deserialize bucket
+ long tableId = in.readLong();
+ Long partitionId = null;
+ String partitionName = null;
+ if (in.readBoolean()) {
+ partitionId = in.readLong();
+ partitionName = in.readUTF();
+ }
+ int bucketId = in.readInt();
+ TableBucket tableBucket = new TableBucket(tableId, partitionId,
bucketId);
+
+ // deserialize write result
+ int writeResultLength = in.readInt();
+ WriteResult writeResult;
+ if (writeResultLength >= 0) {
+ byte[] writeResultBytes = new byte[writeResultLength];
+ in.readFully(writeResultBytes);
+ writeResult =
writeResultSerializer.deserialize(CURRENT_VERSION, writeResultBytes);
Review Comment:
In v2 deserialization, the nested `writeResultSerializer` is invoked with
`CURRENT_VERSION` (2). `writeResultSerializer` has its own versioning (e.g.,
PaimonWriteResultSerializer expects version=1) so this will throw at runtime.
Deserialize the write-result bytes using the write-result serializer’s version
(or persist its version alongside the bytes).
```suggestion
writeResult =
writeResultSerializer.deserialize(
writeResultSerializer.getVersion(),
writeResultBytes);
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -441,6 +470,133 @@ private void finishCurrentTable() throws IOException {
currentTableSplitsByBucket.clear();
}
+ /** Cleans up the state for tables that have been marked as failed by the
Enumerator. */
+ private void cleanUpFailedTables() {
+ if (failedTableIds.isEmpty()) {
+ return;
+ }
+
+ // clean up pending splits for failed tables
+ for (Long failedTableId : failedTableIds) {
+ pendingTieringSplits.remove(failedTableId);
+ pendingTieringTables.remove(failedTableId);
+
+ // if the failed table is the current table being processed, clean
it up
+ if (currentTableId != null &&
currentTableId.equals(failedTableId)) {
+ LOG.info(
+ "Cleaning up current table {} (id={}) as it has been
marked as failed.",
+ currentTablePath,
+ currentTableId);
+ try {
+ cleanupCurrentTableState();
+ } catch (Exception e) {
+ LOG.warn(
+ "Error while cleaning up failed table {} state:
{}",
+ failedTableId,
+ e.getMessage());
+ }
+ }
+ }
+ failedTableIds.clear();
+ }
+
+ public void notifyTableTieringFailed(long tableId) {
+ LOG.info("Received notification that table {} tiering has failed.",
tableId);
+ failedTableIds.add(tableId);
+ }
+
+ /**
+ * Handles an exception that occurred during table tiering.
+ *
+ * @param e the exception that occurred
+ * @return an empty result to indicate no records are available for this
fetch cycle
+ */
+ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>
handleTableTieringException(
+ Exception e) {
+ if (currentTableId == null) {
+ // no current table, re-throw the exception
+ LOG.error("Exception occurred but no current table is being
processed.", e);
+ throw new RuntimeException(e);
+ }
+
+ LOG.error(
+ "Tiering failed for table {} (id={}). Error: {}. "
+ + "The table will be marked as failed and skipped.",
+ currentTablePath,
+ currentTableId,
+ e.getMessage(),
+ e);
+
+ // store the failure info in the queue
+ long failedTableId = currentTableId;
+ String failReason = ExceptionUtils.stringifyException(e);
+ failedTableQueue.offer(new FailedTableInfo(failedTableId, failReason));
+
+ // clean up current table state
+ try {
+ cleanupCurrentTableState();
+ } catch (Exception cleanupException) {
+ LOG.warn(
+ "Error while cleaning up state for failed table {}: {}",
+ failedTableId,
+ cleanupException.getMessage());
+ }
+
+ return emptyTableBucketWriteResultWithSplitIds();
+ }
Review Comment:
On exception, `handleTableTieringException(...)` returns an empty
`RecordsWithSplitIds` and clears internal table state, but it doesn’t mark the
affected splits as finished. That means the SourceReader may never invoke
`onSplitFinished()` (and thus never request new splits), potentially stalling
processing after a failure. Consider returning a `RecordsWithSplitIds` that
finishes the current table’s split IDs when failing a table.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java:
##########
@@ -62,11 +116,78 @@ public void start() {
}
}
+ @Override
+ public InputStatus
pollNext(ReaderOutput<TableBucketWriteResult<WriteResult>> output)
+ throws Exception {
+ // Check for failed tables and send events to Enumerator
+ processFailedTables();
+
+ // Emit any pending failure markers to the downstream Committer
+ emitFailedMarkersToCommitter(output);
+
+ return super.pollNext(output);
+ }
+
+ /**
+ * Processes any failed tables detected from the SplitReader and sends
FailedTieringEvent to the
+ * Enumerator.
+ */
+ private void processFailedTables() {
+ TieringSplitReader.FailedTableInfo failedTable;
+ while ((failedTable = failedTableQueue.poll()) != null) {
+ LOG.info(
+ "Detected table {} tiering failure, sending
FailedTieringEvent to Enumerator. Reason: {}",
+ failedTable.getTableId(),
+ failedTable.getFailReason());
+ context.sendSourceEventToCoordinator(
+ new FailedTieringEvent(failedTable.getTableId(),
failedTable.getFailReason()));
+ }
+ }
+
+ /** Emits any pending failure markers to the downstream Committer. */
+ private void emitFailedMarkersToCommitter(
+ ReaderOutput<TableBucketWriteResult<WriteResult>> output) {
+ TableBucketWriteResult<WriteResult> failedMarker;
+ while ((failedMarker = failedMarkersForCommitter.poll()) != null) {
+ LOG.info(
+ "Emitting failure marker for table {} to downstream
Committer.",
+ failedMarker.tableBucket().getTableId());
+ output.collect(failedMarker);
+ }
+ }
+
@Override
protected void onSplitFinished(Map<String, TieringSplitState>
finishedSplitIds) {
context.sendSplitRequest();
}
+ @Override
+ public void handleSourceEvents(SourceEvent sourceEvent) {
+ if (sourceEvent instanceof TableTieringFailedEvent) {
+ TableTieringFailedEvent failedEvent = (TableTieringFailedEvent)
sourceEvent;
+ long failedTableId = failedEvent.getTableId();
+ String failReason = failedEvent.getFailReason();
+
+ LOG.info(
+ "Received TableTieringFailedEvent from Enumerator for
table {}. Reason: {}",
+ failedTableId,
+ failReason);
+
+ // Notify the SplitReader to clean up state for the failed table
+ splitReader.notifyTableTieringFailed(failedTableId);
+
Review Comment:
`handleSourceEvents()` calls `splitReader.notifyTableTieringFailed(...)`
directly. The SplitReader is used by the fetcher thread (and already
communicates via concurrent queues), so direct cross-thread mutation requires
the SplitReader side to be thread-safe. After making failure notifications
thread-safe, consider using a concurrent signal/queue rather than mutating
SplitReader state directly from the SourceReader thread.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java:
##########
@@ -62,11 +116,78 @@ public void start() {
}
}
+ @Override
+ public InputStatus
pollNext(ReaderOutput<TableBucketWriteResult<WriteResult>> output)
+ throws Exception {
+ // Check for failed tables and send events to Enumerator
+ processFailedTables();
+
+ // Emit any pending failure markers to the downstream Committer
+ emitFailedMarkersToCommitter(output);
+
+ return super.pollNext(output);
+ }
+
+ /**
+ * Processes any failed tables detected from the SplitReader and sends
FailedTieringEvent to the
+ * Enumerator.
+ */
+ private void processFailedTables() {
+ TieringSplitReader.FailedTableInfo failedTable;
+ while ((failedTable = failedTableQueue.poll()) != null) {
+ LOG.info(
+ "Detected table {} tiering failure, sending
FailedTieringEvent to Enumerator. Reason: {}",
+ failedTable.getTableId(),
+ failedTable.getFailReason());
+ context.sendSourceEventToCoordinator(
+ new FailedTieringEvent(failedTable.getTableId(),
failedTable.getFailReason()));
+ }
+ }
+
+ /** Emits any pending failure markers to the downstream Committer. */
+ private void emitFailedMarkersToCommitter(
+ ReaderOutput<TableBucketWriteResult<WriteResult>> output) {
+ TableBucketWriteResult<WriteResult> failedMarker;
+ while ((failedMarker = failedMarkersForCommitter.poll()) != null) {
+ LOG.info(
+ "Emitting failure marker for table {} to downstream
Committer.",
+ failedMarker.tableBucket().getTableId());
+ output.collect(failedMarker);
+ }
+ }
+
@Override
protected void onSplitFinished(Map<String, TieringSplitState>
finishedSplitIds) {
context.sendSplitRequest();
}
+ @Override
+ public void handleSourceEvents(SourceEvent sourceEvent) {
+ if (sourceEvent instanceof TableTieringFailedEvent) {
+ TableTieringFailedEvent failedEvent = (TableTieringFailedEvent)
sourceEvent;
+ long failedTableId = failedEvent.getTableId();
+ String failReason = failedEvent.getFailReason();
+
+ LOG.info(
+ "Received TableTieringFailedEvent from Enumerator for
table {}. Reason: {}",
+ failedTableId,
+ failReason);
+
+ // Notify the SplitReader to clean up state for the failed table
+ splitReader.notifyTableTieringFailed(failedTableId);
+
+ // Create a failure marker and queue it for sending to downstream
Committer
+ TableBucketWriteResult<WriteResult> failedMarker =
+ TableBucketWriteResult.failedMarker(failedTableId,
failReason);
+ failedMarkersForCommitter.offer(failedMarker);
+ LOG.info(
+ "Queued failure marker for table {} to be sent to
downstream Committer.",
+ failedTableId);
Review Comment:
Method handleSourceEvents ignores exceptional return value of
Queue<TableBucketWriteResult<WriteResult>>.offer.
```suggestion
boolean enqueued = failedMarkersForCommitter.offer(failedMarker);
if (enqueued) {
LOG.info(
"Queued failure marker for table {} to be sent to
downstream Committer.",
failedTableId);
} else {
LOG.warn(
"Failed to queue failure marker for table {}.
Reason: {}",
failedTableId,
failReason);
}
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -124,18 +144,27 @@ public
RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> fetch() throws I
// may read snapshot firstly
if (currentSnapshotSplitReader != null) {
Review Comment:
The new exception handling only wraps `readBatch()` / `poll()`. Exceptions
during split/table transitions just above (e.g., `checkSplitOrStartNext()` ->
`getOrMoveToTable()` and the table-id check) will still propagate out of
`fetch()` and fail the whole job. Consider extending the try/catch to cover the
table-switching logic as well, and route failures through the same per-table
failure handling path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]