C0urante commented on code in PR #12478:
URL: https://github.com/apache/kafka/pull/12478#discussion_r953898875
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -175,9 +179,8 @@ protected <V> V execAndRetry(Operation<V> operation) throws
Exception {
log.trace("Caught a retriable exception while executing {}
operation with {}", context.stage(), context.executingClass());
errorHandlingMetrics.recordFailure();
if (checkRetry(startTime)) {
- backoff(attempt, deadline);
- if (Thread.currentThread().isInterrupted()) {
- log.trace("Thread was interrupted. Marking operation
as failed.");
+ if (!backoff(attempt, deadline) ||
Thread.currentThread().isInterrupted()) {
+ log.trace("Thread was interrupted or exit condition
was triggered. Marking operation as failed.");
Review Comment:
Nit: "exit condition was triggered" is pretty vague. What about "shutdown
has been scheduled"?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -175,9 +179,8 @@ protected <V> V execAndRetry(Operation<V> operation) throws
Exception {
log.trace("Caught a retriable exception while executing {}
operation with {}", context.stage(), context.executingClass());
errorHandlingMetrics.recordFailure();
if (checkRetry(startTime)) {
- backoff(attempt, deadline);
- if (Thread.currentThread().isInterrupted()) {
- log.trace("Thread was interrupted. Marking operation
as failed.");
+ if (!backoff(attempt, deadline) ||
Thread.currentThread().isInterrupted()) {
Review Comment:
We don't need the check to see if the thread is interrupted anymore; this
was only included because of the semantics for `SystemTime::sleep`, which in
turn used
[Utils:sleep](https://github.com/apache/kafka/blob/5eff8592cc0cb57475c92bce6e80264d71a9bfbe/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L372-L383),
which handled interruptions by resetting the thread interrupt flag and then
returning normally.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -269,7 +289,7 @@ void backoff(int attempt, long deadline) {
delay = deadline - time.milliseconds();
}
log.debug("Sleeping for {} millis", delay);
- time.sleep(delay);
+ return !exitLatch.await(delay, TimeUnit.MILLISECONDS);
Review Comment:
I don't think we should use a return value here to indicate whether it's
safe to continue or not, since it's unclear how to handle interruptions (if we
get interrupted before we're scheduled for shutdown, we should keep retrying;
if we're interrupted after being scheduled for shutdown, we should stop).
We can use the pattern from `AbstractWorkerSourceTask` in this class: use a
latch for backing off, but also set a boolean flag when we're scheduled for
shutdown, and use the latter when checking whether we should perform another
retry iteration.
The control flow I have in mind is something like this:
- `backoff` tries to block the thread for the specified delay, but can
return early due to interruption, shutdown, whatever. There is no return value,
and it swallows `InterruptedException` instances by just returning normally
- We check the boolean flag before performing any retries and, if it's set,
preemptively exit
- `exit` counts down the latch and sets the boolean flag to indicate that we
should halt execution
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -253,13 +256,30 @@ public ToleranceType getErrorToleranceType() {
return errorToleranceType;
}
- // Visible for testing
+ /**
+ * Check whether we can continue retrying or not.
+ * Visible for testing.
+ * @param startTime the time in milliseconds when the retriable operation
was first begun
+ * @return true if we can continue retrying; false if the retry timeout
has been reached and we can't retry anymore
+ */
boolean checkRetry(long startTime) {
+ if (errorRetryTimeout < 0) {
+ return true;
+ }
return (time.milliseconds() - startTime) < errorRetryTimeout;
Review Comment:
This can be simplified:
```suggestion
return errorRetryTimeout < 0
|| (time.milliseconds() - startTime) < errorRetryTimeout;
```
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -175,9 +179,8 @@ protected <V> V execAndRetry(Operation<V> operation) throws
Exception {
log.trace("Caught a retriable exception while executing {}
operation with {}", context.stage(), context.executingClass());
errorHandlingMetrics.recordFailure();
if (checkRetry(startTime)) {
Review Comment:
I could be missing something, but do we even need `checkRetry`? Can we
replace this with `time.milliseconds() < deadline`?
Asking because we've added special logic around `errorRetryTimeout` being
negative in two different places, which seems unnecessary.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -331,6 +351,15 @@ public synchronized Throwable error() {
return this.context.error();
}
+ /**
+ * Exit from any currently ongoing retry loop and mark the operation as
failed.
+ * This can be called from a separate thread to break out of an infinite
retry loop in
Review Comment:
Nit: this is useful for more than just infinite retries
```suggestion
* This can be called from a separate thread to break out of
retry/backoff loops in
```
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -331,6 +351,15 @@ public synchronized Throwable error() {
return this.context.error();
}
+ /**
+ * Exit from any currently ongoing retry loop and mark the operation as
failed.
+ * This can be called from a separate thread to break out of an infinite
retry loop in
+ * {@link #execAndRetry(Operation)}
+ */
+ public void exit() {
Review Comment:
Nit: I'm not a huge fan of `exit` since it has connotations of being able to
immediately interrupt in-progress operations, which we don't provide at the
moment. We could borrow from `WorkerTask` and call this `triggerStop` instead,
and borrow from `AbstractWorkerSourceTask` and name the latch
`stopRequestedLatch`.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -331,6 +351,15 @@ public synchronized Throwable error() {
return this.context.error();
}
+ /**
+ * Exit from any currently ongoing retry loop and mark the operation as
failed.
Review Comment:
This makes it sound like we can interrupt the operation that we're retrying,
but that's not the case. And because Java doesn't really provide a great way to
force a thread to stop or return control to a call site, we probably can't ever
really accomplish that. I do like the part about marking the operation as
failed, though--nice attention to detail there 👍
What about "Attempt no further retries for any operations, and mark any
ongoing operations that are blocked during backoff, or currently being invoked
and do not complete successfully, as failed"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]