yashmayya commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1167384274


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -114,6 +127,7 @@ public class Worker {
 
     public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(5);
     public static final long EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(1);
+    public static final long ALTER_OFFSETS_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(5);

Review Comment:
   Thanks for the detailed response and it sounds like we're on the same page 
now. I've refactored the alter offsets worker API to be asynchronous.
   
   > possible additional guard to prevent cancelled source tasks from 
committing offsets
   
   I guess we don't really need to worry too much about cancelled source tasks 
since during regular task stop, we also remove the periodic offset commit task 
in the `SourceTaskOffsetCommitter`?
   
   > one way we could try to decrease the risks of an asynchronous API for 
non-exactly-once source connectors could be to refuse to assign tasks for 
connectors with ongoing offset alterations during rebalance, even if the 
connector is resumed
   
   That's an interesting idea but it does seem to be a pretty invasive change 
w.r.t the current rebalancing logic which is agnostic to all on-going 
operations in the workers. The limitation is also a valid one and yeah the same 
risks apply even with the sync API although I'm not sure I follow why you think 
it's less likely? Isn't it more likely that a synchronous alter offsets request 
hangs and causes the leader to fall out of the group leading to a new leader 
being elected?
   
   > We could also try to add interruption logic that cancels any in-progress 
offset alter/reset requests when a rebalance starts
   
   We would need to be careful about the exact points where we allow 
interruptions. For instance, we wouldn't want to abandon a request midway 
through writing offsets (in the non-EoS source connector case where it isn't an 
atomic operation, or for consumer groups when we're altering offsets for some 
partitions + resetting offsets for some others). Although, this does seem like 
a more appealing option overall and I've filed this Jira as a potential follow 
up item - https://issues.apache.org/jira/browse/KAFKA-14910



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to