gaoyunhaii commented on code in PR #21546:
URL: https://github.com/apache/flink/pull/21546#discussion_r1066547072
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -443,33 +445,53 @@ public RetryableResultHandlerDelegator(
}
public void registerTimeout(long timeout) {
- resultHandler.registerTimeout(processingTimeService, timeout);
+ // must overwrite the registerTimeout here to control the callback
logic
+ registerTimeout(processingTimeService, timeout, resultHandler);
+ }
+
+ private void registerTimeout(
Review Comment:
Might be extracted as a utility method?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -443,33 +445,53 @@ public RetryableResultHandlerDelegator(
}
public void registerTimeout(long timeout) {
- resultHandler.registerTimeout(processingTimeService, timeout);
+ // must overwrite the registerTimeout here to control the callback
logic
+ registerTimeout(processingTimeService, timeout, resultHandler);
+ }
+
+ private void registerTimeout(
+ ProcessingTimeService processingTimeService,
+ long timeout,
+ ResultHandler resultHandler) {
+ final long timeoutTimestamp =
+ timeout + processingTimeService.getCurrentProcessingTime();
+
+ resultHandler.timeoutTimer =
+ processingTimeService.registerTimer(
+ timeoutTimestamp, timestamp -> timerTriggered());
+ }
+
+ /** Rewrite the timeout process to deal with retry state. */
+ private void timerTriggered() throws Exception {
+ if (!resultHandler.completed.get()) {
+ // cancel delayed retry timer first
+ if (delayedRetryTimer != null) {
+ delayedRetryTimer.cancel(true);
+ }
+ // force reset retryAwaiting to prevent the handler to trigger
retry unnecessarily
+ retryAwaiting.set(false);
+
+ userFunction.timeout(resultHandler.inputRecord.getValue(),
this);
+ }
}
@Override
public void complete(Collection<OUT> results) {
Preconditions.checkNotNull(
results, "Results must not be null, use empty collection
to emit nothing");
if (!retryDisabledOnFinish.get() &&
resultHandler.inputRecord.isRecord()) {
- // ignore repeated call(s)
- if (!retryAwaiting.compareAndSet(false, true)) {
- return;
- }
-
processRetryInMailBox(results, null);
} else {
+ if (delayedRetryTimer != null) {
Review Comment:
Should we also need to cancel the time on `completeExceptionally`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]