Ewen Cheslack-Postava created KAFKA-2867:
--------------------------------------------
Summary: Missing synchronization and improperly handled
InterruptException in WorkerSourceTask
Key: KAFKA-2867
URL: https://issues.apache.org/jira/browse/KAFKA-2867
Project: Kafka
Issue Type: Bug
Components: copycat
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
Priority: Blocker
Fix For: 0.9.0.0, 0.9.1.0
In WorkerSourceTask, finishSuccessfulFlush() is not synchronized. In one case
(if the flush didn't even have to be started), this is ok because we are
already in a synchronized block. However, the other case is outside the
synchronized block.
The result of this was transient failures of the system test for clean bouncing
copycat nodes. The bug doesn't cause exceptions because finishSuccessfulFlush()
only does a swap of two maps and sets a flag to false. However, because of the
swapping of the two maps that maintain outstanding messages, we could by chance
also be starting to send a message. If the message accidentally gets added to
the backlog queue, then the flushing flag is toggled, we can "lose" that
message temporarily into the backlog queue. Then we'll get a callback that will
log an error because it can't find a record of the acked message (which, if it
ever appears, should be considered a critical issue since it shouldn't be
possible), and then on the next commit, it'll be swapped *back into place*. On
the subsequent commit, the flush will never be able to complete because the
message will be in the outstanding list, but will already have been acked.
This, in turn, makes it impossible to commit offsets, and results in duplicate
messages even under clean bounces where we should be able to get exactly once
delivery assuming no network delays or other issues.
As a result of seeing this error, it became apparent that handling of
WorkerSourceTaskThreads that do not complete quickly enough was not working
properly. The ShutdownableThread should get interrupted if it does not complete
quickly enough, but logs like this would happen:
{quote}
[2015-11-18 01:02:13,897] INFO Stopping task verifiable-source-0
(org.apache.kafka.connect.runtime.Worker)
[2015-11-18 01:02:13,897] INFO Starting graceful shutdown of thread
WorkerSourceTask-verifiable-source-0
(org.apache.kafka.connect.util.ShutdownableThread)
[2015-11-18 01:02:13,897] DEBUG WorkerSourceTask{id=verifiable-source-0}
Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2015-11-18 01:02:17,901] DEBUG Submitting 1 entries to backing store
(org.apache.kafka.connect.storage.OffsetStorageWriter)
[2015-11-18 01:02:18,897] INFO Forcing shutdown of thread
WorkerSourceTask-verifiable-source-0
(org.apache.kafka.connect.util.ShutdownableThread)
[2015-11-18 01:02:18,897] ERROR Graceful stop of task
WorkerSourceTask{id=verifiable-source-0} failed.
(org.apache.kafka.connect.runtime.Worker)
[2015-11-18 01:02:18,897] ERROR Failed to flush
WorkerSourceTask{id=verifiable-source-0}, timed out while waiting for producer
to flush outstanding messages
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2015-11-18 01:02:18,898] DEBUG Submitting 1 entries to backing store
(org.apache.kafka.connect.storage.OffsetStorageWriter)
[2015-11-18 01:02:18,898] INFO Finished stopping tasks in preparation for
rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
{quote}
Actions in the background thread performing the commit continue to occur after
it is supposedly interrupted. This is because InterruptedExceptions during the
flush were being ignored (some time ago they were not even possible). Instead,
any interruption by the main thread trying to shut down the thread in
preparation for a rebalance should be handled by failing the commit operation
and returning so the thread can exit cleanly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)