This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch release-0.11.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 89a27a1ccf4bdc32c60158aad3ca6e8a7068c206 Author: Danny Chan <[email protected]> AuthorDate: Wed Apr 20 12:48:24 2022 +0800 [HUDI-3917] Flink write task hangs if last checkpoint has no data input (#5360) --- .../hudi/sink/StreamWriteOperatorCoordinator.java | 31 ++++++++++++++++++++++ .../hudi/sink/append/AppendWriteFunction.java | 2 +- .../sink/common/AbstractStreamWriteFunction.java | 9 ++++++- 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index b5ec08a583..023b1e6965 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.sink.utils.HiveSyncContext; @@ -42,6 +43,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -429,6 +431,31 @@ public class StreamWriteOperatorCoordinator addEventToBuffer(event); } + /** + * The coordinator reuses the instant if there is no data for this round of checkpoint, + * sends the commit ack events to unblock the flushing. + */ + private void sendCommitAckEvents(long checkpointId) { + CompletableFuture<?>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull) + .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId))) + .toArray(CompletableFuture<?>[]::new); + CompletableFuture.allOf(futures).whenComplete((resp, error) -> { + if (!sendToFinishedTasks(error)) { + throw new HoodieException("Error while waiting for the commit ack events to finish sending", error); + } + }); + } + + /** + * Decides whether the given exception is caused by sending events to FINISHED tasks. + * + * <p>Ugly impl: the exception may change in the future. + */ + private static boolean sendToFinishedTasks(Throwable throwable) { + return throwable.getCause() instanceof TaskNotRunningException + || throwable.getCause().getMessage().contains("running"); + } + /** * Commits the instant. */ @@ -456,6 +483,10 @@ public class StreamWriteOperatorCoordinator if (writeResults.size() == 0) { // No data has written, reset the buffer and returns early reset(); + // Send commit ack event to the write function to unblock the flushing + // If this checkpoint has no inputs while the next checkpoint has inputs, + // the 'isConfirming' flag should be switched with the ack event. + sendCommitAckEvents(checkpointId); return false; } doCommit(instant, writeResults); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java index a72b885a22..7b40718b35 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java @@ -123,9 +123,9 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> { writeStatus = this.writerHelper.getWriteStatuses(this.taskID); instant = this.writerHelper.getInstantTime(); } else { - LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant); writeStatus = Collections.emptyList(); instant = instantToWrite(false); + LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, instant); } final WriteMetadataEvent event = WriteMetadataEvent.builder() .taskID(taskID) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 4e8712b661..98085fa74f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -247,7 +247,7 @@ public abstract class AbstractStreamWriteFunction<I> // wait condition: // 1. there is no inflight instant // 2. the inflight instant does not change and the checkpoint has buffering data - if (instant == null || (instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant))) { + if (instant == null || invalidInstant(instant, hasData)) { // sleep for a while timeWait.waitFor(); // refresh the inflight instant @@ -260,4 +260,11 @@ public abstract class AbstractStreamWriteFunction<I> } return instant; } + + /** + * Returns whether the pending instant is invalid to write with. + */ + private boolean invalidInstant(String instant, boolean hasData) { + return instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant); + } }
