gaoyunhaii commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r919556536
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -128,6 +156,15 @@ public AsyncWaitOperator(
this.timeout = timeout;
+ this.asyncRetryStrategy = asyncRetryStrategy;
+
+ this.retryEnabled =
+ // construct from utility class
+ asyncRetryStrategy != NO_RETRY_STRATEGY
+ // construct from api
+ ||
asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()
Review Comment:
Should it be
`asyncRetryStrategy != NO_RETRY_STRATEGY` &&
(asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()
||asyncRetryStrategy.getRetryPredicate().exceptionPredicate().isPresent())` ?
otherwise if user construct a new strategy return empty for both methods, it
should be indeed not enabled?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -154,6 +190,14 @@ public void setup(
default:
throw new IllegalStateException("Unknown async mode: " +
outputMode + '.');
}
+ if
(asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()) {
Review Comment:
I tend we do not leave retryResultPredicate / retryExceptionPredicate to be
null to avoid possible errors. It could be
```
retryResultPredicate =
asyncRetryStrategy.getRetryPredicate().resultPredicate()
.orElse(ignored -> false);
retryExceptionPredicate =
asyncRetryStrategy.getRetryPredicate().exceptionPredicate()
.orElse(ignored -> false);
```
Then we could also simplify the test to
```
boolean satisfy =
(null != results && retryResultPredicate.test(results))
|| (null != error && retryExceptionPredicate.test(error));
```
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
}
}
+ /**
+ * Besides doRetry, the cleanup work will be done after retry fired,
includes reset retry
+ * in-flight flag and remove retry handler from the incomplete retry
handlers.
+ */
+ private void doRetryWithCleanup(RetryableResultHandlerDelegator
resultHandlerDelegator)
+ throws Exception {
+ doRetry(resultHandlerDelegator);
+
+ // reset retryInFlight for next possible retry
+ resultHandlerDelegator.retryInFlight.set(false);
+ // remove from incomplete retry handlers
+ inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+ }
+
+ /** Increments number of attempts and fire the attempt. */
+ private void doRetry(RetryableResultHandlerDelegator
resultHandlerDelegator) throws Exception {
+ // increment current attempt number
+ resultHandlerDelegator.currentAttempts++;
+
+ // fire a new attempt
+ userFunction.asyncInvoke(
+ resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+ resultHandlerDelegator);
+ }
+
+ /** A delegator holds the real {@link ResultHandler} to handle retries. */
+ private class RetryableResultHandlerDelegator implements ResultFuture<OUT>
{
+
+ private final ResultHandler resultHandler;
+ private final ProcessingTimeService processingTimeService;
+
+ private ScheduledFuture<?> delayedRetryTimer;
+
+ /** start from 1, when this entry created, the first attempt will
happen. */
+ private int currentAttempts = 1;
+
+ /**
+ * A guard similar to ResultHandler.complete to prevent repeated
complete calls from
+ * ill-written AsyncFunction. This flag indicates a retry is
in-flight, will reject new
+ * retry request if true. And wil be reset to false after the retry
fired.
+ */
+ private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+ public RetryableResultHandlerDelegator(
+ StreamRecord<IN> inputRecord,
+ ResultFuture<OUT> resultFuture,
+ ProcessingTimeService processingTimeService) {
+ this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+ this.processingTimeService = processingTimeService;
+ }
+
+ public void registerTimeout(long timeout) {
+ resultHandler.registerTimeout(processingTimeService, timeout);
+ }
+
+ @Override
+ public void complete(Collection<OUT> results) {
+ Preconditions.checkNotNull(
+ results, "Results must not be null, use empty collection
to emit nothing");
+ if (retryEnabled
+ && delayedRetryAvailable.get()
+ && resultHandler.inputRecord.isRecord()) {
+ // ignore repeated call(s)
+ if (!retryInFlight.compareAndSet(false, true)) {
+ return;
+ }
+
+ processRetryInMailBox(results, null);
+ } else {
+ resultHandler.complete(results);
+ }
+ }
+
+ private void processRetryInMailBox(Collection<OUT> results, Throwable
error) {
+ mailboxExecutor.submit(
+ () -> processRetry(this, results, error), "delayed retry
or complete");
+ }
+
+ private void processRetry(
+ RetryableResultHandlerDelegator resultHandlerDelegator,
+ Collection<OUT> results,
+ Throwable error) {
+ boolean satisfy = false;
+ if (null != results && null != retryResultPredicate) {
+ satisfy = (satisfy || retryResultPredicate.test(results));
+ }
+ if (null != error && null != retryExceptionPredicate) {
+ satisfy = (satisfy || retryExceptionPredicate.test(error));
+ }
+ if (satisfy) {
+ if (asyncRetryStrategy.canRetry(currentAttempts)) {
+ long nextBackoffTimeMillis =
+
asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+ if (delayedRetryAvailable.get()) {
+ final long delayedRetry =
+ nextBackoffTimeMillis
+ +
getProcessingTimeService().getCurrentProcessingTime();
+
+ // timer thread will finally dispatch the task to
mailbox executor,
+ // and it can only be submitted once for one attempt.
+ delayedRetryTimer =
+ processingTimeService.registerTimer(
+ delayedRetry,
+ timestamp ->
doRetryWithCleanup(resultHandlerDelegator));
+
+ // add to incomplete retry handlers, will remove it
after retry fired
+ inFlightDelayRetryHandlers.add(resultHandlerDelegator);
+
+ return;
Review Comment:
nit: it seems a bit non-intuitive for the return here. Could we remove the
nested iteration by
```
if (satisfy
&& asyncRetryStrategy.canRetry(currentAttempts)
&& delayedRetryAvailable.get()) {
...
}
``
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
}
}
+ /**
+ * Besides doRetry, the cleanup work will be done after retry fired,
includes reset retry
+ * in-flight flag and remove retry handler from the incomplete retry
handlers.
+ */
+ private void doRetryWithCleanup(RetryableResultHandlerDelegator
resultHandlerDelegator)
+ throws Exception {
+ doRetry(resultHandlerDelegator);
+
+ // reset retryInFlight for next possible retry
+ resultHandlerDelegator.retryInFlight.set(false);
+ // remove from incomplete retry handlers
+ inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+ }
+
+ /** Increments number of attempts and fire the attempt. */
+ private void doRetry(RetryableResultHandlerDelegator
resultHandlerDelegator) throws Exception {
+ // increment current attempt number
+ resultHandlerDelegator.currentAttempts++;
+
+ // fire a new attempt
+ userFunction.asyncInvoke(
+ resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+ resultHandlerDelegator);
+ }
+
+ /** A delegator holds the real {@link ResultHandler} to handle retries. */
+ private class RetryableResultHandlerDelegator implements ResultFuture<OUT>
{
+
+ private final ResultHandler resultHandler;
+ private final ProcessingTimeService processingTimeService;
+
+ private ScheduledFuture<?> delayedRetryTimer;
+
+ /** start from 1, when this entry created, the first attempt will
happen. */
+ private int currentAttempts = 1;
+
+ /**
+ * A guard similar to ResultHandler.complete to prevent repeated
complete calls from
+ * ill-written AsyncFunction. This flag indicates a retry is
in-flight, will reject new
+ * retry request if true. And wil be reset to false after the retry
fired.
+ */
+ private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+ public RetryableResultHandlerDelegator(
+ StreamRecord<IN> inputRecord,
+ ResultFuture<OUT> resultFuture,
+ ProcessingTimeService processingTimeService) {
+ this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+ this.processingTimeService = processingTimeService;
+ }
+
+ public void registerTimeout(long timeout) {
+ resultHandler.registerTimeout(processingTimeService, timeout);
+ }
+
+ @Override
+ public void complete(Collection<OUT> results) {
+ Preconditions.checkNotNull(
+ results, "Results must not be null, use empty collection
to emit nothing");
+ if (retryEnabled
+ && delayedRetryAvailable.get()
+ && resultHandler.inputRecord.isRecord()) {
+ // ignore repeated call(s)
+ if (!retryInFlight.compareAndSet(false, true)) {
+ return;
+ }
+
+ processRetryInMailBox(results, null);
+ } else {
+ resultHandler.complete(results);
+ }
+ }
+
+ private void processRetryInMailBox(Collection<OUT> results, Throwable
error) {
+ mailboxExecutor.submit(
+ () -> processRetry(this, results, error), "delayed retry
or complete");
+ }
+
+ private void processRetry(
+ RetryableResultHandlerDelegator resultHandlerDelegator,
+ Collection<OUT> results,
+ Throwable error) {
+ boolean satisfy = false;
+ if (null != results && null != retryResultPredicate) {
+ satisfy = (satisfy || retryResultPredicate.test(results));
+ }
+ if (null != error && null != retryExceptionPredicate) {
+ satisfy = (satisfy || retryExceptionPredicate.test(error));
+ }
+ if (satisfy) {
+ if (asyncRetryStrategy.canRetry(currentAttempts)) {
+ long nextBackoffTimeMillis =
+
asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+ if (delayedRetryAvailable.get()) {
+ final long delayedRetry =
+ nextBackoffTimeMillis
+ +
getProcessingTimeService().getCurrentProcessingTime();
+
+ // timer thread will finally dispatch the task to
mailbox executor,
+ // and it can only be submitted once for one attempt.
+ delayedRetryTimer =
+ processingTimeService.registerTimer(
+ delayedRetry,
+ timestamp ->
doRetryWithCleanup(resultHandlerDelegator));
+
+ // add to incomplete retry handlers, will remove it
after retry fired
Review Comment:
Is it necessary to remove / add it each time? Perhaps we could only add it
once on the first time and remove it if not retry?
Also it seems we indeed do not remove the handle if the retry stopped now?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
}
}
+ /**
+ * Besides doRetry, the cleanup work will be done after retry fired,
includes reset retry
+ * in-flight flag and remove retry handler from the incomplete retry
handlers.
+ */
+ private void doRetryWithCleanup(RetryableResultHandlerDelegator
resultHandlerDelegator)
+ throws Exception {
+ doRetry(resultHandlerDelegator);
+
+ // reset retryInFlight for next possible retry
+ resultHandlerDelegator.retryInFlight.set(false);
+ // remove from incomplete retry handlers
+ inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+ }
+
+ /** Increments number of attempts and fire the attempt. */
+ private void doRetry(RetryableResultHandlerDelegator
resultHandlerDelegator) throws Exception {
+ // increment current attempt number
+ resultHandlerDelegator.currentAttempts++;
+
+ // fire a new attempt
+ userFunction.asyncInvoke(
+ resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+ resultHandlerDelegator);
+ }
+
+ /** A delegator holds the real {@link ResultHandler} to handle retries. */
+ private class RetryableResultHandlerDelegator implements ResultFuture<OUT>
{
+
+ private final ResultHandler resultHandler;
+ private final ProcessingTimeService processingTimeService;
+
+ private ScheduledFuture<?> delayedRetryTimer;
+
+ /** start from 1, when this entry created, the first attempt will
happen. */
+ private int currentAttempts = 1;
+
+ /**
+ * A guard similar to ResultHandler.complete to prevent repeated
complete calls from
+ * ill-written AsyncFunction. This flag indicates a retry is
in-flight, will reject new
+ * retry request if true. And wil be reset to false after the retry
fired.
+ */
+ private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+ public RetryableResultHandlerDelegator(
+ StreamRecord<IN> inputRecord,
+ ResultFuture<OUT> resultFuture,
+ ProcessingTimeService processingTimeService) {
+ this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+ this.processingTimeService = processingTimeService;
+ }
+
+ public void registerTimeout(long timeout) {
+ resultHandler.registerTimeout(processingTimeService, timeout);
+ }
+
+ @Override
+ public void complete(Collection<OUT> results) {
+ Preconditions.checkNotNull(
+ results, "Results must not be null, use empty collection
to emit nothing");
+ if (retryEnabled
+ && delayedRetryAvailable.get()
+ && resultHandler.inputRecord.isRecord()) {
+ // ignore repeated call(s)
+ if (!retryInFlight.compareAndSet(false, true)) {
Review Comment:
It seems a bit non-intuitive here since when completed `retryInFlight` seems
should be `false`. Could we rename the variable to something like
`lastRetryCompleted` ?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -197,11 +248,12 @@ public void processElement(StreamRecord<IN> record)
throws Exception {
// add element first to the queue
final ResultFuture<OUT> entry = addToWorkQueue(element);
- final ResultHandler resultHandler = new ResultHandler(element, entry);
+ final RetryableResultHandlerDelegator resultHandler =
Review Comment:
I still think we'd better distinguish the two cases since it is in critical
path (record level operations). For each record, we'll pay the cost of method
calls and atomic variables reads, thus I think we should still use different
handles.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -109,11 +130,18 @@
/** Whether object reuse has been enabled or disabled. */
private transient boolean isObjectReuseEnabled;
+ private transient Predicate<Collection<OUT>> retryResultPredicate;
+
+ private transient Predicate<Throwable> retryExceptionPredicate;
+
+ private transient AtomicBoolean delayedRetryAvailable;
Review Comment:
Might add some comment for this flag?
Also, if possible I'm a bit tend to rename the variable to
`retryDislabledOnFinish` and reverse the judgement to make it more visible.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
}
}
+ /**
+ * Besides doRetry, the cleanup work will be done after retry fired,
includes reset retry
+ * in-flight flag and remove retry handler from the incomplete retry
handlers.
+ */
+ private void doRetryWithCleanup(RetryableResultHandlerDelegator
resultHandlerDelegator)
Review Comment:
This could be moved into the `RetryableResultHandlerDelegator`.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
}
}
+ /**
+ * Besides doRetry, the cleanup work will be done after retry fired,
includes reset retry
+ * in-flight flag and remove retry handler from the incomplete retry
handlers.
+ */
+ private void doRetryWithCleanup(RetryableResultHandlerDelegator
resultHandlerDelegator)
+ throws Exception {
+ doRetry(resultHandlerDelegator);
+
+ // reset retryInFlight for next possible retry
+ resultHandlerDelegator.retryInFlight.set(false);
+ // remove from incomplete retry handlers
+ inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+ }
+
+ /** Increments number of attempts and fire the attempt. */
+ private void doRetry(RetryableResultHandlerDelegator
resultHandlerDelegator) throws Exception {
+ // increment current attempt number
+ resultHandlerDelegator.currentAttempts++;
+
+ // fire a new attempt
+ userFunction.asyncInvoke(
+ resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+ resultHandlerDelegator);
+ }
+
+ /** A delegator holds the real {@link ResultHandler} to handle retries. */
+ private class RetryableResultHandlerDelegator implements ResultFuture<OUT>
{
+
+ private final ResultHandler resultHandler;
+ private final ProcessingTimeService processingTimeService;
+
+ private ScheduledFuture<?> delayedRetryTimer;
+
+ /** start from 1, when this entry created, the first attempt will
happen. */
+ private int currentAttempts = 1;
+
+ /**
+ * A guard similar to ResultHandler.complete to prevent repeated
complete calls from
+ * ill-written AsyncFunction. This flag indicates a retry is
in-flight, will reject new
+ * retry request if true. And wil be reset to false after the retry
fired.
+ */
+ private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+ public RetryableResultHandlerDelegator(
+ StreamRecord<IN> inputRecord,
+ ResultFuture<OUT> resultFuture,
+ ProcessingTimeService processingTimeService) {
+ this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+ this.processingTimeService = processingTimeService;
+ }
+
+ public void registerTimeout(long timeout) {
+ resultHandler.registerTimeout(processingTimeService, timeout);
+ }
+
+ @Override
+ public void complete(Collection<OUT> results) {
+ Preconditions.checkNotNull(
+ results, "Results must not be null, use empty collection
to emit nothing");
+ if (retryEnabled
+ && delayedRetryAvailable.get()
+ && resultHandler.inputRecord.isRecord()) {
+ // ignore repeated call(s)
+ if (!retryInFlight.compareAndSet(false, true)) {
+ return;
+ }
+
+ processRetryInMailBox(results, null);
+ } else {
+ resultHandler.complete(results);
+ }
+ }
+
+ private void processRetryInMailBox(Collection<OUT> results, Throwable
error) {
+ mailboxExecutor.submit(
+ () -> processRetry(this, results, error), "delayed retry
or complete");
Review Comment:
It is not necessary to pass `this` as a separate parameter. The mix of
accessing properties with `this` and `resultHandlerDelegator` in `processRetry`
would complex the implementation.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
}
}
+ /**
+ * Besides doRetry, the cleanup work will be done after retry fired,
includes reset retry
+ * in-flight flag and remove retry handler from the incomplete retry
handlers.
+ */
+ private void doRetryWithCleanup(RetryableResultHandlerDelegator
resultHandlerDelegator)
+ throws Exception {
+ doRetry(resultHandlerDelegator);
+
+ // reset retryInFlight for next possible retry
+ resultHandlerDelegator.retryInFlight.set(false);
+ // remove from incomplete retry handlers
+ inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+ }
+
+ /** Increments number of attempts and fire the attempt. */
+ private void doRetry(RetryableResultHandlerDelegator
resultHandlerDelegator) throws Exception {
+ // increment current attempt number
+ resultHandlerDelegator.currentAttempts++;
+
+ // fire a new attempt
+ userFunction.asyncInvoke(
+ resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+ resultHandlerDelegator);
+ }
+
+ /** A delegator holds the real {@link ResultHandler} to handle retries. */
+ private class RetryableResultHandlerDelegator implements ResultFuture<OUT>
{
+
+ private final ResultHandler resultHandler;
+ private final ProcessingTimeService processingTimeService;
+
+ private ScheduledFuture<?> delayedRetryTimer;
+
+ /** start from 1, when this entry created, the first attempt will
happen. */
+ private int currentAttempts = 1;
+
+ /**
+ * A guard similar to ResultHandler.complete to prevent repeated
complete calls from
+ * ill-written AsyncFunction. This flag indicates a retry is
in-flight, will reject new
+ * retry request if true. And wil be reset to false after the retry
fired.
+ */
+ private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+ public RetryableResultHandlerDelegator(
+ StreamRecord<IN> inputRecord,
+ ResultFuture<OUT> resultFuture,
+ ProcessingTimeService processingTimeService) {
+ this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+ this.processingTimeService = processingTimeService;
+ }
+
+ public void registerTimeout(long timeout) {
+ resultHandler.registerTimeout(processingTimeService, timeout);
+ }
+
+ @Override
+ public void complete(Collection<OUT> results) {
+ Preconditions.checkNotNull(
+ results, "Results must not be null, use empty collection
to emit nothing");
+ if (retryEnabled
+ && delayedRetryAvailable.get()
+ && resultHandler.inputRecord.isRecord()) {
+ // ignore repeated call(s)
+ if (!retryInFlight.compareAndSet(false, true)) {
+ return;
+ }
+
+ processRetryInMailBox(results, null);
+ } else {
+ resultHandler.complete(results);
+ }
+ }
+
+ private void processRetryInMailBox(Collection<OUT> results, Throwable
error) {
+ mailboxExecutor.submit(
+ () -> processRetry(this, results, error), "delayed retry
or complete");
+ }
+
+ private void processRetry(
+ RetryableResultHandlerDelegator resultHandlerDelegator,
+ Collection<OUT> results,
+ Throwable error) {
+ boolean satisfy = false;
+ if (null != results && null != retryResultPredicate) {
+ satisfy = (satisfy || retryResultPredicate.test(results));
+ }
+ if (null != error && null != retryExceptionPredicate) {
+ satisfy = (satisfy || retryExceptionPredicate.test(error));
+ }
+ if (satisfy) {
+ if (asyncRetryStrategy.canRetry(currentAttempts)) {
+ long nextBackoffTimeMillis =
+
asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+ if (delayedRetryAvailable.get()) {
+ final long delayedRetry =
+ nextBackoffTimeMillis
+ +
getProcessingTimeService().getCurrentProcessingTime();
+
+ // timer thread will finally dispatch the task to
mailbox executor,
+ // and it can only be submitted once for one attempt.
+ delayedRetryTimer =
+ processingTimeService.registerTimer(
+ delayedRetry,
+ timestamp ->
doRetryWithCleanup(resultHandlerDelegator));
+
+ // add to incomplete retry handlers, will remove it
after retry fired
+ inFlightDelayRetryHandlers.add(resultHandlerDelegator);
+
+ return;
+ }
+ }
+ }
+ // retry unsatisfied, complete it
+ if (null != results) {
+ resultHandlerDelegator.resultHandler.complete(results);
+ } else {
+
resultHandlerDelegator.resultHandler.completeExceptionally(error);
+ }
+ }
+
+ @Override
+ public void completeExceptionally(Throwable error) {
Review Comment:
nit: moved to right after method complete(...) ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]