Ewen Cheslack-Postava created KAFKA-2867:
--------------------------------------------

             Summary: Missing synchronization and improperly handled 
InterruptException in WorkerSourceTask
                 Key: KAFKA-2867
                 URL: https://issues.apache.org/jira/browse/KAFKA-2867
             Project: Kafka
          Issue Type: Bug
          Components: copycat
            Reporter: Ewen Cheslack-Postava
            Assignee: Ewen Cheslack-Postava
            Priority: Blocker
             Fix For: 0.9.0.0, 0.9.1.0


In WorkerSourceTask, finishSuccessfulFlush() is not synchronized. In one case 
(if the flush didn't even have to be started), this is ok because we are 
already in a synchronized block. However, the other case is outside the 
synchronized block.

The result of this was transient failures of the system test for clean bouncing 
copycat nodes. The bug doesn't cause exceptions because finishSuccessfulFlush() 
only does a swap of two maps and sets a flag to false. However, because of the 
swapping of the two maps that maintain outstanding messages, we could by chance 
also be starting to send a message. If the message accidentally gets added to 
the backlog queue, then the flushing flag is toggled, we can "lose" that 
message temporarily into the backlog queue. Then we'll get a callback that will 
log an error because it can't find a record of the acked message (which, if it 
ever appears, should be considered a critical issue since it shouldn't be 
possible), and then on the next commit, it'll be swapped *back into place*. On 
the subsequent commit, the flush will never be able to complete because the 
message will be in the outstanding list, but will already have been acked. 
This, in turn, makes it impossible to commit offsets, and results in duplicate 
messages even under clean bounces where we should be able to get exactly once 
delivery assuming no network delays or other issues.

As a result of seeing this error, it became apparent that handling of 
WorkerSourceTaskThreads that do not complete quickly enough was not working 
properly. The ShutdownableThread should get interrupted if it does not complete 
quickly enough, but logs like this would happen:

{quote}
[2015-11-18 01:02:13,897] INFO Stopping task verifiable-source-0 
(org.apache.kafka.connect.runtime.Worker)
[2015-11-18 01:02:13,897] INFO Starting graceful shutdown of thread 
WorkerSourceTask-verifiable-source-0 
(org.apache.kafka.connect.util.ShutdownableThread)
[2015-11-18 01:02:13,897] DEBUG WorkerSourceTask{id=verifiable-source-0} 
Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2015-11-18 01:02:17,901] DEBUG Submitting 1 entries to backing store 
(org.apache.kafka.connect.storage.OffsetStorageWriter)
[2015-11-18 01:02:18,897] INFO Forcing shutdown of thread 
WorkerSourceTask-verifiable-source-0 
(org.apache.kafka.connect.util.ShutdownableThread)
[2015-11-18 01:02:18,897] ERROR Graceful stop of task 
WorkerSourceTask{id=verifiable-source-0} failed. 
(org.apache.kafka.connect.runtime.Worker)
[2015-11-18 01:02:18,897] ERROR Failed to flush 
WorkerSourceTask{id=verifiable-source-0}, timed out while waiting for producer 
to flush outstanding messages 
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2015-11-18 01:02:18,898] DEBUG Submitting 1 entries to backing store 
(org.apache.kafka.connect.storage.OffsetStorageWriter)
[2015-11-18 01:02:18,898] INFO Finished stopping tasks in preparation for 
rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
{quote}

Actions in the background thread performing the commit continue to occur after 
it is supposedly interrupted. This is because InterruptedExceptions during the 
flush were being ignored (some time ago they were not even possible). Instead, 
any interruption by the main thread trying to shut down the thread in 
preparation for a rebalance should be handled by failing the commit operation 
and returning so the thread can exit cleanly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to