We know that in the async send mode, kafka do not guarantee the message order
even for the same partition.
That is, if we send 3 request ( the same topic, the same partition) to a
kafka server in the async mode,
the send order is 1, 2, 3 (correlation id is 1, 2, 3), while the kafka server
maybe save the 3 request in the log by the order 3, 2, 1, and return to the
client by the order 2, 3, 1??
This happens because Kafka server processes requests with multi threads(multi
KafkaRequestHandler).
If the above is true, below in the 0.9.0 java client idk maybe has problem:
In the class NetworkClient, there is a collection inFlightRequests to maintain
all the in flight request:
private final InFlightRequests inFlightRequests;
final class InFlightRequests {
private final int maxInFlightRequestsPerConnection;
private final Map<String, Deque<ClientRequest>> requests = new
HashMap<String, Deque<ClientRequest>>(); ...}
It use a Deque to maintain the in flight requests whose response has not come
back.
Whenever we send a request, we will enqueue the request, and when the response
come back, we will dequeue the request.
private void doSend(ClientRequest request, long now) {
request.setSendTimeMs(now);
this.inFlightRequests.add(request);
selector.send(request.request());
}private void handleCompletedReceives(List<ClientResponse> responses, long now)
{
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
ResponseHeader header = ResponseHeader.parse(receive.payload());
// Always expect the response version id to be the same as the request
version id
short apiKey = req.request().header().apiKey();
short apiVer = req.request().header().apiVersion();
Struct body = (Struct) ProtoUtils.responseSchema(apiKey,
apiVer).read(receive.payload());
correlate(req.request().header(), header);
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
}
}
but if the request order and the response order does not match, is it the
Deque suitable? or it should be use a Map to maintain the request?
By the way, in the above, there is a function correlate(xxx) to check the
match, if not match, it will throw a exception.private void
correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
if (requestHeader.correlationId() != responseHeader.correlationId())
throw new IllegalStateException("Correlation id for response (" +
responseHeader.correlationId()
+ ") does not match request (" + requestHeader.correlationId()
+ ")");
}
But in the async mode, as mentioned above, the mismatch is normal, and likely
happen.
So here is it enough to process the problem by just throwing an exception ?