C0urante commented on code in PR #12478:
URL: https://github.com/apache/kafka/pull/12478#discussion_r953898875


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -175,9 +179,8 @@ protected <V> V execAndRetry(Operation<V> operation) throws 
Exception {
                 log.trace("Caught a retriable exception while executing {} 
operation with {}", context.stage(), context.executingClass());
                 errorHandlingMetrics.recordFailure();
                 if (checkRetry(startTime)) {
-                    backoff(attempt, deadline);
-                    if (Thread.currentThread().isInterrupted()) {
-                        log.trace("Thread was interrupted. Marking operation 
as failed.");
+                    if (!backoff(attempt, deadline) || 
Thread.currentThread().isInterrupted()) {
+                        log.trace("Thread was interrupted or exit condition 
was triggered. Marking operation as failed.");

Review Comment:
   Nit: "exit condition was triggered" is pretty vague. What about "shutdown 
has been scheduled"?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -175,9 +179,8 @@ protected <V> V execAndRetry(Operation<V> operation) throws 
Exception {
                 log.trace("Caught a retriable exception while executing {} 
operation with {}", context.stage(), context.executingClass());
                 errorHandlingMetrics.recordFailure();
                 if (checkRetry(startTime)) {
-                    backoff(attempt, deadline);
-                    if (Thread.currentThread().isInterrupted()) {
-                        log.trace("Thread was interrupted. Marking operation 
as failed.");
+                    if (!backoff(attempt, deadline) || 
Thread.currentThread().isInterrupted()) {

Review Comment:
   We don't need the check to see if the thread is interrupted anymore; this 
was only included because of the semantics for `SystemTime::sleep`, which in 
turn used 
[Utils:sleep](https://github.com/apache/kafka/blob/5eff8592cc0cb57475c92bce6e80264d71a9bfbe/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L372-L383),
 which handled interruptions by resetting the thread interrupt flag and then 
returning normally.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -269,7 +289,7 @@ void backoff(int attempt, long deadline) {
             delay = deadline - time.milliseconds();
         }
         log.debug("Sleeping for {} millis", delay);
-        time.sleep(delay);
+        return !exitLatch.await(delay, TimeUnit.MILLISECONDS);

Review Comment:
   I don't think we should use a return value here to indicate whether it's 
safe to continue or not, since it's unclear how to handle interruptions (if we 
get interrupted before we're scheduled for shutdown, we should keep retrying; 
if we're interrupted after being scheduled for shutdown, we should stop).
   
   We can use the pattern from `AbstractWorkerSourceTask` in this class: use a 
latch for backing off, but also set a boolean flag when we're scheduled for 
shutdown, and use the latter when checking whether we should perform another 
retry iteration.
   
   The control flow I have in mind is something like this:
   - `backoff` tries to block the thread for the specified delay, but can 
return early due to interruption, shutdown, whatever. There is no return value, 
and it swallows `InterruptedException` instances by just returning normally
   - We check the boolean flag before performing any retries and, if it's set, 
preemptively exit
   - `exit` counts down the latch and sets the boolean flag to indicate that we 
should halt execution



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -253,13 +256,30 @@ public ToleranceType getErrorToleranceType() {
         return errorToleranceType;
     }
 
-    // Visible for testing
+    /**
+     * Check whether we can continue retrying or not.
+     * Visible for testing.
+     * @param startTime the time in milliseconds when the retriable operation 
was first begun
+     * @return true if we can continue retrying; false if the retry timeout 
has been reached and we can't retry anymore
+     */
     boolean checkRetry(long startTime) {
+        if (errorRetryTimeout < 0) {
+            return true;
+        }
         return (time.milliseconds() - startTime) < errorRetryTimeout;

Review Comment:
   This can be simplified:
   
   ```suggestion
           return errorRetryTimeout < 0
                   || (time.milliseconds() - startTime) < errorRetryTimeout;
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -175,9 +179,8 @@ protected <V> V execAndRetry(Operation<V> operation) throws 
Exception {
                 log.trace("Caught a retriable exception while executing {} 
operation with {}", context.stage(), context.executingClass());
                 errorHandlingMetrics.recordFailure();
                 if (checkRetry(startTime)) {

Review Comment:
   I could be missing something, but do we even need `checkRetry`? Can we 
replace this with `time.milliseconds() < deadline`?
   
   Asking because we've added special logic around `errorRetryTimeout` being 
negative in two different places, which seems unnecessary.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -331,6 +351,15 @@ public synchronized Throwable error() {
         return this.context.error();
     }
 
+    /**
+     * Exit from any currently ongoing retry loop and mark the operation as 
failed.
+     * This can be called from a separate thread to break out of an infinite 
retry loop in

Review Comment:
   Nit: this is useful for more than just infinite retries
   ```suggestion
        * This can be called from a separate thread to break out of 
retry/backoff loops in
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -331,6 +351,15 @@ public synchronized Throwable error() {
         return this.context.error();
     }
 
+    /**
+     * Exit from any currently ongoing retry loop and mark the operation as 
failed.
+     * This can be called from a separate thread to break out of an infinite 
retry loop in
+     * {@link #execAndRetry(Operation)}
+     */
+    public void exit() {

Review Comment:
   Nit: I'm not a huge fan of `exit` since it has connotations of being able to 
immediately interrupt in-progress operations, which we don't provide at the 
moment. We could borrow from `WorkerTask` and call this `triggerStop` instead, 
and borrow from `AbstractWorkerSourceTask` and name the latch 
`stopRequestedLatch`.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -331,6 +351,15 @@ public synchronized Throwable error() {
         return this.context.error();
     }
 
+    /**
+     * Exit from any currently ongoing retry loop and mark the operation as 
failed.

Review Comment:
   This makes it sound like we can interrupt the operation that we're retrying, 
but that's not the case. And because Java doesn't really provide a great way to 
force a thread to stop or return control to a call site, we probably can't ever 
really accomplish that. I do like the part about marking the operation as 
failed, though--nice attention to detail there 👍
   
   What about "Attempt no further retries for any operations, and mark any 
ongoing operations that are blocked during backoff, or currently being invoked 
and do not complete successfully, as failed"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to