luoyuxia commented on code in PR #2920:
URL: https://github.com/apache/fluss/pull/2920#discussion_r3208660681
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/FailedTieringEvent.java:
##########
@@ -19,25 +19,48 @@
import org.apache.flink.api.connector.source.SourceEvent;
-/** SourceEvent used to represent a Fluss table is failed during tiering. */
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/** SourceEvent used to represent a Fluss table has failed during tiering. */
public class FailedTieringEvent implements SourceEvent {
+ /** The type of tiering failure. */
+ public enum FailureType {
+ TABLE_DROPPED,
+ COMMIT_FAILURE
+ }
+
private static final long serialVersionUID = 1L;
private final long tableId;
- private final String failReason;
+ private final FailureType failureType;
+
+ private final String failureMessage;
+
+ public FailedTieringEvent(long tableId, String failureMessage) {
+ this(tableId, FailureType.COMMIT_FAILURE, failureMessage);
+ }
- public FailedTieringEvent(long tableId, String failReason) {
+ public FailedTieringEvent(long tableId, FailureType failureType, String
failureMessage) {
this.tableId = tableId;
- this.failReason = failReason;
+ this.failureType = checkNotNull(failureType);
+ this.failureMessage = failureMessage;
}
public long getTableId() {
return tableId;
}
- public String failReason() {
- return failReason;
+ public FailureType getFailureType() {
+ return failureType;
+ }
+
+ public String getFailureMessage() {
+ return failureMessage;
+ }
+
+ public boolean isCancelled() {
+ return failureType == FailureType.TABLE_DROPPED;
Review Comment:
dito. The isCancelled and failureType == TABLE_DROPPED are not the same
thing semantically.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -151,6 +151,27 @@ public void
processElement(StreamRecord<TableBucketWriteResult<WriteResult>> str
collectTableAllBucketWriteResult(tableId);
if (committableWriteResults != null) {
+ // Check if any result is cancelled (table was dropped)
+ boolean isCancelled =
+
committableWriteResults.stream().anyMatch(TableBucketWriteResult::isCancelled);
+ if (isCancelled) {
+ LOG.info(
+ "Skipping commit for dropped table {}, table path {}.",
+ tableId,
+ tableBucketWriteResult.tablePath());
+ collectedTableBucketWriteResults.remove(tableId);
+ // Notify the enumerator that this table's tiering is
cancelled (dropped)
+ // via FailedTieringEvent with TABLE_DROPPED type.
+ operatorEventGateway.sendEventToCoordinator(
+ new SourceEventWrapper(
+ new FailedTieringEvent(
+ tableId,
+
FailedTieringEvent.FailureType.TABLE_DROPPED,
Review Comment:
isCancelled and TABLE_DROPPED are not the same thing semantically.
isCancelled describes the state of the write result, while TABLE_DROPPED is
a specific cancellation reason.
The current code upgrades any cancelled result into
FailedTieringEvent(TABLE_DROPPED), which makes the control-flow reason more
specific than the data it is derived from.
A simpler fix here might be to rename cancelled to something reason-specific
like tableDropped / isTableDropped(). My main concern is that cancelled is a
generic state, while TABLE_DROPPED is a specific cause, so the current mapping
feels semantically too broad.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -357,6 +383,16 @@ private void generateAndAssignSplits(
if (throwable != null) {
LOG.warn("Failed to request tiering table, will retry later.",
throwable);
}
+
+ // Process dropped table IDs collected during heartbeat processing.
+ if (!pendingDroppedTableIds.isEmpty()) {
+ List<Long> droppedIds = new ArrayList<>(pendingDroppedTableIds);
+ pendingDroppedTableIds.clear();
+ for (long droppedTableId : droppedIds) {
+ handleTableDropped(droppedTableId);
Review Comment:
It looks strange to me that `handleTableDropped` is called in
`generateAndAssignSplits`. Can we decouple it with method
`generateAndAssignSplits`? Once found a tiered table dropped, then handle it
with `context.runInCoordinatorThread`
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -284,18 +305,23 @@ public void handleSourceEvent(int subtaskId, SourceEvent
sourceEvent) {
if (sourceEvent instanceof FailedTieringEvent) {
FailedTieringEvent failedEvent = (FailedTieringEvent) sourceEvent;
long failedTableId = failedEvent.getTableId();
- Long tieringEpoch = tieringTableEpochs.remove(failedTableId);
LOG.info(
"Tiering table {} is failed, fail reason is {}.",
failedTableId,
- failedEvent.failReason());
+ failedEvent.getFailureMessage());
+ Long tieringEpoch = markTableAsFailed(failedTableId);
Review Comment:
why extract `markTableAsFailed` into a method, I just feel extract a method
is not necessary and make harder to track the code logic
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -284,18 +305,23 @@ public void handleSourceEvent(int subtaskId, SourceEvent
sourceEvent) {
if (sourceEvent instanceof FailedTieringEvent) {
FailedTieringEvent failedEvent = (FailedTieringEvent) sourceEvent;
long failedTableId = failedEvent.getTableId();
- Long tieringEpoch = tieringTableEpochs.remove(failedTableId);
LOG.info(
"Tiering table {} is failed, fail reason is {}.",
failedTableId,
- failedEvent.failReason());
+ failedEvent.getFailureMessage());
+ Long tieringEpoch = markTableAsFailed(failedTableId);
if (tieringEpoch == null) {
- // shouldn't happen, warn it
- LOG.warn(
- "The failed table {} is not in tiering table, won't
report it to Fluss to mark as failed.",
- failedTableId);
- } else {
- failedTableEpochs.put(failedTableId, tieringEpoch);
+ if (!failedEvent.isCancelled()) {
Review Comment:
This branch looks over-specialized for logging only. We could simplify it by
using a single log path when tieringEpoch == null, instead of distinguishing
cancelled vs non-cancelled events here for simplity
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java:
##########
@@ -70,8 +70,18 @@ public RecordWriter(
public abstract void write(LogRecord record) throws Exception;
+ /**
+ * Completes the write process and returns the commit message.
+ *
+ * @return the commit message, or null if no data was written (empty write
scenario)
+ */
+ @Nullable
CommitMessage complete() throws Exception {
List<CommitMessage> commitMessages = tableWrite.prepareCommit();
+ if (commitMessages.isEmpty()) {
+ // No data was written, return null to indicate empty write
+ return null;
Review Comment:
When will it return null? If `CommitMessage` is Nullable, we will need to
also mark the `commitMessage` in PaimonWriteResult as Nullable. I think we can
revert this change.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -151,6 +151,27 @@ public void
processElement(StreamRecord<TableBucketWriteResult<WriteResult>> str
collectTableAllBucketWriteResult(tableId);
if (committableWriteResults != null) {
+ // Check if any result is cancelled (table was dropped)
+ boolean isCancelled =
Review Comment:
actually, we will also need to abort the committable to clean the staging
data files. Not sure whether the change is big or not. If it's log, maybe you
can left a todo
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]