lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r919683662
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -109,11 +130,18 @@
/** Whether object reuse has been enabled or disabled. */
private transient boolean isObjectReuseEnabled;
+ private transient Predicate<Collection<OUT>> retryResultPredicate;
+
+ private transient Predicate<Throwable> retryExceptionPredicate;
+
+ private transient AtomicBoolean delayedRetryAvailable;
Review Comment:
`retryDislabledOnFinish` seems a better choice!
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -154,6 +190,14 @@ public void setup(
default:
throw new IllegalStateException("Unknown async mode: " +
outputMode + '.');
}
+ if
(asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()) {
Review Comment:
This simplification is great!
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -128,6 +156,15 @@ public AsyncWaitOperator(
this.timeout = timeout;
+ this.asyncRetryStrategy = asyncRetryStrategy;
+
+ this.retryEnabled =
+ // construct from utility class
+ asyncRetryStrategy != NO_RETRY_STRATEGY
+ // construct from api
+ ||
asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()
Review Comment:
You're right, the top level '||' should be '&&'
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
}
}
+ /**
+ * Besides doRetry, the cleanup work will be done after retry fired,
includes reset retry
+ * in-flight flag and remove retry handler from the incomplete retry
handlers.
+ */
+ private void doRetryWithCleanup(RetryableResultHandlerDelegator
resultHandlerDelegator)
+ throws Exception {
+ doRetry(resultHandlerDelegator);
+
+ // reset retryInFlight for next possible retry
+ resultHandlerDelegator.retryInFlight.set(false);
+ // remove from incomplete retry handlers
+ inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+ }
+
+ /** Increments number of attempts and fire the attempt. */
+ private void doRetry(RetryableResultHandlerDelegator
resultHandlerDelegator) throws Exception {
+ // increment current attempt number
+ resultHandlerDelegator.currentAttempts++;
+
+ // fire a new attempt
+ userFunction.asyncInvoke(
+ resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+ resultHandlerDelegator);
+ }
+
+ /** A delegator holds the real {@link ResultHandler} to handle retries. */
+ private class RetryableResultHandlerDelegator implements ResultFuture<OUT>
{
+
+ private final ResultHandler resultHandler;
+ private final ProcessingTimeService processingTimeService;
+
+ private ScheduledFuture<?> delayedRetryTimer;
+
+ /** start from 1, when this entry created, the first attempt will
happen. */
+ private int currentAttempts = 1;
+
+ /**
+ * A guard similar to ResultHandler.complete to prevent repeated
complete calls from
+ * ill-written AsyncFunction. This flag indicates a retry is
in-flight, will reject new
+ * retry request if true. And wil be reset to false after the retry
fired.
+ */
+ private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+ public RetryableResultHandlerDelegator(
+ StreamRecord<IN> inputRecord,
+ ResultFuture<OUT> resultFuture,
+ ProcessingTimeService processingTimeService) {
+ this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+ this.processingTimeService = processingTimeService;
+ }
+
+ public void registerTimeout(long timeout) {
+ resultHandler.registerTimeout(processingTimeService, timeout);
+ }
+
+ @Override
+ public void complete(Collection<OUT> results) {
+ Preconditions.checkNotNull(
+ results, "Results must not be null, use empty collection
to emit nothing");
+ if (retryEnabled
+ && delayedRetryAvailable.get()
+ && resultHandler.inputRecord.isRecord()) {
+ // ignore repeated call(s)
+ if (!retryInFlight.compareAndSet(false, true)) {
+ return;
+ }
+
+ processRetryInMailBox(results, null);
+ } else {
+ resultHandler.complete(results);
+ }
+ }
+
+ private void processRetryInMailBox(Collection<OUT> results, Throwable
error) {
+ mailboxExecutor.submit(
+ () -> processRetry(this, results, error), "delayed retry
or complete");
Review Comment:
This is the debt of previous refactor, should be cleanup indeed.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
}
}
+ /**
+ * Besides doRetry, the cleanup work will be done after retry fired,
includes reset retry
+ * in-flight flag and remove retry handler from the incomplete retry
handlers.
+ */
+ private void doRetryWithCleanup(RetryableResultHandlerDelegator
resultHandlerDelegator)
+ throws Exception {
+ doRetry(resultHandlerDelegator);
+
+ // reset retryInFlight for next possible retry
+ resultHandlerDelegator.retryInFlight.set(false);
+ // remove from incomplete retry handlers
+ inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+ }
+
+ /** Increments number of attempts and fire the attempt. */
+ private void doRetry(RetryableResultHandlerDelegator
resultHandlerDelegator) throws Exception {
+ // increment current attempt number
+ resultHandlerDelegator.currentAttempts++;
+
+ // fire a new attempt
+ userFunction.asyncInvoke(
+ resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+ resultHandlerDelegator);
+ }
+
+ /** A delegator holds the real {@link ResultHandler} to handle retries. */
+ private class RetryableResultHandlerDelegator implements ResultFuture<OUT>
{
+
+ private final ResultHandler resultHandler;
+ private final ProcessingTimeService processingTimeService;
+
+ private ScheduledFuture<?> delayedRetryTimer;
+
+ /** start from 1, when this entry created, the first attempt will
happen. */
+ private int currentAttempts = 1;
+
+ /**
+ * A guard similar to ResultHandler.complete to prevent repeated
complete calls from
+ * ill-written AsyncFunction. This flag indicates a retry is
in-flight, will reject new
+ * retry request if true. And wil be reset to false after the retry
fired.
+ */
+ private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+ public RetryableResultHandlerDelegator(
+ StreamRecord<IN> inputRecord,
+ ResultFuture<OUT> resultFuture,
+ ProcessingTimeService processingTimeService) {
+ this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+ this.processingTimeService = processingTimeService;
+ }
+
+ public void registerTimeout(long timeout) {
+ resultHandler.registerTimeout(processingTimeService, timeout);
+ }
+
+ @Override
+ public void complete(Collection<OUT> results) {
+ Preconditions.checkNotNull(
+ results, "Results must not be null, use empty collection
to emit nothing");
+ if (retryEnabled
+ && delayedRetryAvailable.get()
+ && resultHandler.inputRecord.isRecord()) {
+ // ignore repeated call(s)
+ if (!retryInFlight.compareAndSet(false, true)) {
Review Comment:
I struggled for the variable name for a while, how about naming it as
`retryAwaitting` ? It mainly indicates that it is waiting for a retry, but it
is not triggered yet.
The difference between `lastRetryCompleted` is it does not mean that the
retryHandler is completed/finished, just indicates the status of waiting for
retry. WDYT?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -197,11 +248,12 @@ public void processElement(StreamRecord<IN> record)
throws Exception {
// add element first to the queue
final ResultFuture<OUT> entry = addToWorkQueue(element);
- final ResultHandler resultHandler = new ResultHandler(element, entry);
+ final RetryableResultHandlerDelegator resultHandler =
Review Comment:
Performance perspective, I accept this approach. So the `retryEnabled` flag
can be removed from `RetryableResultHandlerDelegator`.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
}
}
+ /**
+ * Besides doRetry, the cleanup work will be done after retry fired,
includes reset retry
+ * in-flight flag and remove retry handler from the incomplete retry
handlers.
+ */
+ private void doRetryWithCleanup(RetryableResultHandlerDelegator
resultHandlerDelegator)
+ throws Exception {
+ doRetry(resultHandlerDelegator);
+
+ // reset retryInFlight for next possible retry
+ resultHandlerDelegator.retryInFlight.set(false);
+ // remove from incomplete retry handlers
+ inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+ }
+
+ /** Increments number of attempts and fire the attempt. */
+ private void doRetry(RetryableResultHandlerDelegator
resultHandlerDelegator) throws Exception {
+ // increment current attempt number
+ resultHandlerDelegator.currentAttempts++;
+
+ // fire a new attempt
+ userFunction.asyncInvoke(
+ resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+ resultHandlerDelegator);
+ }
+
+ /** A delegator holds the real {@link ResultHandler} to handle retries. */
+ private class RetryableResultHandlerDelegator implements ResultFuture<OUT>
{
+
+ private final ResultHandler resultHandler;
+ private final ProcessingTimeService processingTimeService;
+
+ private ScheduledFuture<?> delayedRetryTimer;
+
+ /** start from 1, when this entry created, the first attempt will
happen. */
+ private int currentAttempts = 1;
+
+ /**
+ * A guard similar to ResultHandler.complete to prevent repeated
complete calls from
+ * ill-written AsyncFunction. This flag indicates a retry is
in-flight, will reject new
+ * retry request if true. And wil be reset to false after the retry
fired.
+ */
+ private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+ public RetryableResultHandlerDelegator(
+ StreamRecord<IN> inputRecord,
+ ResultFuture<OUT> resultFuture,
+ ProcessingTimeService processingTimeService) {
+ this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+ this.processingTimeService = processingTimeService;
+ }
+
+ public void registerTimeout(long timeout) {
+ resultHandler.registerTimeout(processingTimeService, timeout);
+ }
+
+ @Override
+ public void complete(Collection<OUT> results) {
+ Preconditions.checkNotNull(
+ results, "Results must not be null, use empty collection
to emit nothing");
+ if (retryEnabled
+ && delayedRetryAvailable.get()
+ && resultHandler.inputRecord.isRecord()) {
+ // ignore repeated call(s)
+ if (!retryInFlight.compareAndSet(false, true)) {
+ return;
+ }
+
+ processRetryInMailBox(results, null);
+ } else {
+ resultHandler.complete(results);
+ }
+ }
+
+ private void processRetryInMailBox(Collection<OUT> results, Throwable
error) {
+ mailboxExecutor.submit(
+ () -> processRetry(this, results, error), "delayed retry
or complete");
+ }
+
+ private void processRetry(
+ RetryableResultHandlerDelegator resultHandlerDelegator,
+ Collection<OUT> results,
+ Throwable error) {
+ boolean satisfy = false;
+ if (null != results && null != retryResultPredicate) {
+ satisfy = (satisfy || retryResultPredicate.test(results));
+ }
+ if (null != error && null != retryExceptionPredicate) {
+ satisfy = (satisfy || retryExceptionPredicate.test(error));
+ }
+ if (satisfy) {
+ if (asyncRetryStrategy.canRetry(currentAttempts)) {
+ long nextBackoffTimeMillis =
+
asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+ if (delayedRetryAvailable.get()) {
+ final long delayedRetry =
+ nextBackoffTimeMillis
+ +
getProcessingTimeService().getCurrentProcessingTime();
+
+ // timer thread will finally dispatch the task to
mailbox executor,
+ // and it can only be submitted once for one attempt.
+ delayedRetryTimer =
+ processingTimeService.registerTimer(
+ delayedRetry,
+ timestamp ->
doRetryWithCleanup(resultHandlerDelegator));
+
+ // add to incomplete retry handlers, will remove it
after retry fired
+ inFlightDelayRetryHandlers.add(resultHandlerDelegator);
+
+ return;
+ }
+ }
+ }
+ // retry unsatisfied, complete it
+ if (null != results) {
+ resultHandlerDelegator.resultHandler.complete(results);
+ } else {
+
resultHandlerDelegator.resultHandler.completeExceptionally(error);
+ }
+ }
+
+ @Override
+ public void completeExceptionally(Throwable error) {
Review Comment:
Yes, it looks more intuitive to let them together.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
}
}
+ /**
+ * Besides doRetry, the cleanup work will be done after retry fired,
includes reset retry
+ * in-flight flag and remove retry handler from the incomplete retry
handlers.
+ */
+ private void doRetryWithCleanup(RetryableResultHandlerDelegator
resultHandlerDelegator)
+ throws Exception {
+ doRetry(resultHandlerDelegator);
+
+ // reset retryInFlight for next possible retry
+ resultHandlerDelegator.retryInFlight.set(false);
+ // remove from incomplete retry handlers
+ inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+ }
+
+ /** Increments number of attempts and fire the attempt. */
+ private void doRetry(RetryableResultHandlerDelegator
resultHandlerDelegator) throws Exception {
+ // increment current attempt number
+ resultHandlerDelegator.currentAttempts++;
+
+ // fire a new attempt
+ userFunction.asyncInvoke(
+ resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+ resultHandlerDelegator);
+ }
+
+ /** A delegator holds the real {@link ResultHandler} to handle retries. */
+ private class RetryableResultHandlerDelegator implements ResultFuture<OUT>
{
+
+ private final ResultHandler resultHandler;
+ private final ProcessingTimeService processingTimeService;
+
+ private ScheduledFuture<?> delayedRetryTimer;
+
+ /** start from 1, when this entry created, the first attempt will
happen. */
+ private int currentAttempts = 1;
+
+ /**
+ * A guard similar to ResultHandler.complete to prevent repeated
complete calls from
+ * ill-written AsyncFunction. This flag indicates a retry is
in-flight, will reject new
+ * retry request if true. And wil be reset to false after the retry
fired.
+ */
+ private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+ public RetryableResultHandlerDelegator(
+ StreamRecord<IN> inputRecord,
+ ResultFuture<OUT> resultFuture,
+ ProcessingTimeService processingTimeService) {
+ this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+ this.processingTimeService = processingTimeService;
+ }
+
+ public void registerTimeout(long timeout) {
+ resultHandler.registerTimeout(processingTimeService, timeout);
+ }
+
+ @Override
+ public void complete(Collection<OUT> results) {
+ Preconditions.checkNotNull(
+ results, "Results must not be null, use empty collection
to emit nothing");
+ if (retryEnabled
+ && delayedRetryAvailable.get()
+ && resultHandler.inputRecord.isRecord()) {
+ // ignore repeated call(s)
+ if (!retryInFlight.compareAndSet(false, true)) {
+ return;
+ }
+
+ processRetryInMailBox(results, null);
+ } else {
+ resultHandler.complete(results);
+ }
+ }
+
+ private void processRetryInMailBox(Collection<OUT> results, Throwable
error) {
+ mailboxExecutor.submit(
+ () -> processRetry(this, results, error), "delayed retry
or complete");
+ }
+
+ private void processRetry(
+ RetryableResultHandlerDelegator resultHandlerDelegator,
+ Collection<OUT> results,
+ Throwable error) {
+ boolean satisfy = false;
+ if (null != results && null != retryResultPredicate) {
+ satisfy = (satisfy || retryResultPredicate.test(results));
+ }
+ if (null != error && null != retryExceptionPredicate) {
+ satisfy = (satisfy || retryExceptionPredicate.test(error));
+ }
+ if (satisfy) {
+ if (asyncRetryStrategy.canRetry(currentAttempts)) {
+ long nextBackoffTimeMillis =
+
asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+ if (delayedRetryAvailable.get()) {
+ final long delayedRetry =
+ nextBackoffTimeMillis
+ +
getProcessingTimeService().getCurrentProcessingTime();
+
+ // timer thread will finally dispatch the task to
mailbox executor,
+ // and it can only be submitted once for one attempt.
+ delayedRetryTimer =
+ processingTimeService.registerTimer(
+ delayedRetry,
+ timestamp ->
doRetryWithCleanup(resultHandlerDelegator));
+
+ // add to incomplete retry handlers, will remove it
after retry fired
Review Comment:
Do you mean that we add it since first attempt(in processElement) ? I tried
that, and give up for considering the possiblility of retry maybe relatively
smaller against normal processing.
But for the missing removal when retry finally complete, I'd like to change
it like this:
```
if (satisfy retry) {
....
if (currentAttempts == 1) {
// add to incomplete retry handlers only for first time
inFlightDelayRetryHandlers.add(this);
}
} else {
// remove handle that has been tried from incomplete retry handlers
if (currentAttempts > 1) {
inFlightDelayRetryHandlers.remove(this);
}
// resultHandler complete...
}
```
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
}
}
+ /**
+ * Besides doRetry, the cleanup work will be done after retry fired,
includes reset retry
+ * in-flight flag and remove retry handler from the incomplete retry
handlers.
+ */
+ private void doRetryWithCleanup(RetryableResultHandlerDelegator
resultHandlerDelegator)
+ throws Exception {
+ doRetry(resultHandlerDelegator);
+
+ // reset retryInFlight for next possible retry
+ resultHandlerDelegator.retryInFlight.set(false);
+ // remove from incomplete retry handlers
+ inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+ }
+
+ /** Increments number of attempts and fire the attempt. */
+ private void doRetry(RetryableResultHandlerDelegator
resultHandlerDelegator) throws Exception {
+ // increment current attempt number
+ resultHandlerDelegator.currentAttempts++;
+
+ // fire a new attempt
+ userFunction.asyncInvoke(
+ resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+ resultHandlerDelegator);
+ }
+
+ /** A delegator holds the real {@link ResultHandler} to handle retries. */
+ private class RetryableResultHandlerDelegator implements ResultFuture<OUT>
{
+
+ private final ResultHandler resultHandler;
+ private final ProcessingTimeService processingTimeService;
+
+ private ScheduledFuture<?> delayedRetryTimer;
+
+ /** start from 1, when this entry created, the first attempt will
happen. */
+ private int currentAttempts = 1;
+
+ /**
+ * A guard similar to ResultHandler.complete to prevent repeated
complete calls from
+ * ill-written AsyncFunction. This flag indicates a retry is
in-flight, will reject new
+ * retry request if true. And wil be reset to false after the retry
fired.
+ */
+ private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+ public RetryableResultHandlerDelegator(
+ StreamRecord<IN> inputRecord,
+ ResultFuture<OUT> resultFuture,
+ ProcessingTimeService processingTimeService) {
+ this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+ this.processingTimeService = processingTimeService;
+ }
+
+ public void registerTimeout(long timeout) {
+ resultHandler.registerTimeout(processingTimeService, timeout);
+ }
+
+ @Override
+ public void complete(Collection<OUT> results) {
+ Preconditions.checkNotNull(
+ results, "Results must not be null, use empty collection
to emit nothing");
+ if (retryEnabled
+ && delayedRetryAvailable.get()
+ && resultHandler.inputRecord.isRecord()) {
+ // ignore repeated call(s)
+ if (!retryInFlight.compareAndSet(false, true)) {
+ return;
+ }
+
+ processRetryInMailBox(results, null);
+ } else {
+ resultHandler.complete(results);
+ }
+ }
+
+ private void processRetryInMailBox(Collection<OUT> results, Throwable
error) {
+ mailboxExecutor.submit(
+ () -> processRetry(this, results, error), "delayed retry
or complete");
+ }
+
+ private void processRetry(
+ RetryableResultHandlerDelegator resultHandlerDelegator,
+ Collection<OUT> results,
+ Throwable error) {
+ boolean satisfy = false;
+ if (null != results && null != retryResultPredicate) {
+ satisfy = (satisfy || retryResultPredicate.test(results));
+ }
+ if (null != error && null != retryExceptionPredicate) {
+ satisfy = (satisfy || retryExceptionPredicate.test(error));
+ }
+ if (satisfy) {
+ if (asyncRetryStrategy.canRetry(currentAttempts)) {
+ long nextBackoffTimeMillis =
+
asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+ if (delayedRetryAvailable.get()) {
+ final long delayedRetry =
+ nextBackoffTimeMillis
+ +
getProcessingTimeService().getCurrentProcessingTime();
+
+ // timer thread will finally dispatch the task to
mailbox executor,
+ // and it can only be submitted once for one attempt.
+ delayedRetryTimer =
+ processingTimeService.registerTimer(
+ delayedRetry,
+ timestamp ->
doRetryWithCleanup(resultHandlerDelegator));
+
+ // add to incomplete retry handlers, will remove it
after retry fired
+ inFlightDelayRetryHandlers.add(resultHandlerDelegator);
+
+ return;
Review Comment:
make sense
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]