trompa commented on PR #133:
URL:
https://github.com/apache/flink-connector-kafka/pull/133#issuecomment-2701005587
Added the wakeUp loop on my tests:
```
while (t.isAlive() && System.currentTimeMillis() < deadline) {
reader.wakeUp();
Thread.sleep(10);
}
```
with short periods, it happen to throw, reason is because on first wakeUp it
retries the consumer.position() call and a new wakeup is triggered that is not
wrpped.
would it make sense to wrap that second call?
private <V> V retryOnWakeup(Supplier<V> consumerCall, String
description) {
try {
return consumerCall.get();
} catch (WakeupException we) {
LOG.info(
"Caught WakeupException while executing Kafka consumer
call for {}. Will retry the consumer call.",
description);
- return consumerCall.get();
+ return retryOnWakeup(consumerCall, description);
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]