Bill Sobel created KAFKA-2060:
---------------------------------
Summary: Async onCompletion callback may not be called
Key: KAFKA-2060
URL: https://issues.apache.org/jira/browse/KAFKA-2060
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 0.8.1.2
Environment: All
Reporter: Bill Sobel
Priority: Critical
The 'done' function in RecordBatch.java attempts to enumerate and call each
onCompletion() callback. However the call to thunk.future.get() can throw an
exception. When this occurs the callback is not invoked. This appears to be
the only place where a callback per async send would not occur and the callback
orphaned.
The call to thunk.future.get() appears to need to occur in its own try/catch
and then the onCompletion called with the results if it doesn't throw an
exception or thunk.callback.onCompletion(null, recordException) if it does.
e.g.
/**
* Complete the request
*
* @param baseOffset The base offset of the messages assigned by the server
* @param exception The exception that occurred (or null if the request was
successful)
*/
public void done(long baseOffset, RuntimeException exception) {
this.produceFuture.done(topicPartition, baseOffset, exception);
log.trace("Produced messages to topic-partition {} with base offset
offset {} and error: {}.",
topicPartition,
baseOffset,
exception);
// execute callbacks
for (int i = 0; i < this.thunks.size(); i++) {
try {
Thunk thunk = this.thunks.get(i);
if (exception == null) {
RecordMetadata rc = null;
try {
rc = thunk.future.get();
}
catch(Exception recordException) {
thunk.callback.onCompletion(null,
recordException);
}
if(rc != null) {
thunk.callback.onCompletion(rc, null);
}
}
else {
thunk.callback.onCompletion(null, exception);
}
} catch (Exception e) {
log.error("Error executing user-provided callback on message
for topic-partition {}:", topicPartition, e);
}
}
}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)