We know that in the async send mode, kafka do not guarantee the message order 
even for  the same partition.


That is, if we send 3 request  ( the same topic, the same partition)  to a 
kafka server in the async mode, 
the send order is 1, 2, 3 (correlation id is 1, 2, 3),  while the kafka server 
maybe save the 3 request in the log by the order 3, 2, 1,  and return to the 
client by the order 2, 3, 1??


This happens because Kafka server processes requests with multi threads(multi 
KafkaRequestHandler).


If the above is true,  below in the 0.9.0 java client idk maybe has problem:


In the class NetworkClient,  there is a collection inFlightRequests to maintain 
all the in flight request: 


private final InFlightRequests inFlightRequests;
final class InFlightRequests {

    private final int maxInFlightRequestsPerConnection;
    private final Map<String, Deque<ClientRequest>> requests = new 
HashMap<String, Deque<ClientRequest>>();    ...}
It use a Deque to maintain the in flight requests whose response has not come 
back.  
Whenever we send a request, we will enqueue the request,  and when the response 
come back, we will dequeue the request.
private void doSend(ClientRequest request, long now) {
    request.setSendTimeMs(now);
    this.inFlightRequests.add(request);
    selector.send(request.request());
}private void handleCompletedReceives(List<ClientResponse> responses, long now) 
{
    for (NetworkReceive receive : this.selector.completedReceives()) {
        String source = receive.source();
        ClientRequest req = inFlightRequests.completeNext(source);
        ResponseHeader header = ResponseHeader.parse(receive.payload());
        // Always expect the response version id to be the same as the request 
version id
        short apiKey = req.request().header().apiKey();
        short apiVer = req.request().header().apiVersion();
        Struct body = (Struct) ProtoUtils.responseSchema(apiKey, 
apiVer).read(receive.payload());
        correlate(req.request().header(), header);
        if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
            responses.add(new ClientResponse(req, now, false, body));
    }
}
but if the request order and the response order does not match,  is it the 
Deque suitable?  or it should be use a Map to maintain the request?
By the way, in the above,  there is a function correlate(xxx) to check the 
match, if not match,  it will throw a exception.private void 
correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
    if (requestHeader.correlationId() != responseHeader.correlationId())
        throw new IllegalStateException("Correlation id for response (" + 
responseHeader.correlationId()
                + ") does not match request (" + requestHeader.correlationId() 
+ ")");
}
But in the async mode,  as mentioned above,  the mismatch is normal, and likely 
happen. 
So here is it enough to process the problem by just throwing an exception ?

Reply via email to