Bill Sobel created KAFKA-2060:
---------------------------------

             Summary: Async onCompletion callback may not be called
                 Key: KAFKA-2060
                 URL: https://issues.apache.org/jira/browse/KAFKA-2060
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 0.8.1.2
         Environment: All
            Reporter: Bill Sobel
            Priority: Critical


The 'done' function in RecordBatch.java attempts to enumerate and call each 
onCompletion() callback.  However the call to thunk.future.get() can throw an 
exception.  When this occurs the callback is not invoked.  This appears to be 
the only place where a callback per async send would not occur and the callback 
orphaned.

The call to thunk.future.get() appears to need to occur in its own try/catch 
and then the onCompletion called with the results if it doesn't throw an 
exception or thunk.callback.onCompletion(null, recordException) if it does.

e.g.

    /**
     * Complete the request
     * 
     * @param baseOffset The base offset of the messages assigned by the server
     * @param exception The exception that occurred (or null if the request was 
successful)
     */
    public void done(long baseOffset, RuntimeException exception) {
        this.produceFuture.done(topicPartition, baseOffset, exception);
        log.trace("Produced messages to topic-partition {} with base offset 
offset {} and error: {}.",
                  topicPartition,
                  baseOffset,
                  exception);
        // execute callbacks
        for (int i = 0; i < this.thunks.size(); i++) {
            try {
                Thunk thunk = this.thunks.get(i);
                if (exception == null) {
                        RecordMetadata rc = null;
                        try {
                                rc = thunk.future.get();
                        }
                         catch(Exception recordException) {
                                thunk.callback.onCompletion(null, 
recordException);
                        }
                        if(rc != null) {
                                thunk.callback.onCompletion(rc, null);
                        }
                }
                 else {
                     thunk.callback.onCompletion(null, exception);
                 }

            } catch (Exception e) {
                log.error("Error executing user-provided callback on message 
for topic-partition {}:", topicPartition, e);
            }
        }
    }





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to