[
https://issues.apache.org/jira/browse/KAFKA-15100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
José Armando García Sancio resolved KAFKA-15100.
------------------------------------------------
Resolution: Fixed
> Unsafe to call tryCompleteFetchResponse on request timeout
> ----------------------------------------------------------
>
> Key: KAFKA-15100
> URL: https://issues.apache.org/jira/browse/KAFKA-15100
> Project: Kafka
> Issue Type: Bug
> Components: kraft
> Reporter: José Armando García Sancio
> Assignee: José Armando García Sancio
> Priority: Major
> Fix For: 3.6.0, 3.4.2, 3.5.2
>
>
> When the fetch request times out the future is completed from the
> "raft-expiration-executor" SystemTimer thread. KafkaRaftClient assumes that
> tryCompleteFetchResponse is always called from the same thread. This
> invariant is violated in this case.
> {code:java}
> return future.handle((completionTimeMs, exception) -> {
> if (exception != null) {
> Throwable cause = exception instanceof ExecutionException ?
> exception.getCause() : exception; //
> If the fetch timed out in purgatory, it means no new data is available,
> // and we will complete the fetch successfully. Otherwise,
> if there was
> // any other error, we need to return it.
> Errors error = Errors.forException(cause);
> if (error != Errors.REQUEST_TIMED_OUT) {
> logger.info("Failed to handle fetch from {} at {} due
> to {}",
> replicaId, fetchPartition.fetchOffset(), error);
> return buildEmptyFetchResponse(error, Optional.empty());
> }
> } // FIXME: `completionTimeMs`, which can be null
> logger.trace("Completing delayed fetch from {} starting at
> offset {} at {}",
> replicaId, fetchPartition.fetchOffset(), completionTimeMs);
> return tryCompleteFetchRequest(replicaId, fetchPartition,
> time.milliseconds());
> });
> {code}
> One solution is to always build an empty response if the future was completed
> exceptionally. This works because the ExpirationService completes the future
> with a `TimeoutException`.
> A longer-term solution is to use a more flexible event executor service. This
> would be a service that allows more kinds of event to get scheduled/submitted
> to the KRaft thread.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)