Hi everyone,

We’ve been experimenting recently with some limited use of Kafka Connect and 
are hoping to expand to wider use cases soon. However, we had some internal 
issues that gave us a well-timed preview of error handling behavior in Kafka 
Connect. I think the fixes for this will require at least three different KIPs, 
but I want to share some thoughts to get the initial reaction from folks in the 
dev community. If these ideas seem reasonable, I can go ahead and create the 
required KIPs.

Here are the three things specifically we ran into…

-----------

(1) Kafka Connect only retries tasks when certain exceptions are thrown
Currently, Kafka Connect only retries tasks when certain exceptions are thrown 
- I believe the logic checks to see if the exception is specifically marked as 
“retryable” and if not, fails. We’d like to bypass this behavior and implement 
a configurable exponential backoff for tasks regardless of the failure reason. 
This is probably two changes: one to implement exponential backoff retries for 
tasks if they don’t already exist and a chance to implement a RetryPolicy 
interface that evaluates the Exception to determine whether or not to retry.

(2) Kafka Connect doesn’t permit Connectors to smartly reposition after 
rebalance
We’re using the S3 connector to dump files with a large number of records into 
an S3 bucket. About 100,000 records per file. Unfortunately, every time a task 
fails, the consumer rebalance causes all partitions to get re-shuffled amongst 
the various partitions. To compensate for this, the connector gets stopped and 
started from what I can tell from the logs? And then picks up from the last 
consumer position that was committed to the brokers.

This doesn’t work great if you’re batching things into large numbers for 
archival.

For the S3 connector, for example: Let’s say I have two partitions and the 
connector has two tasks to process each of those. Task 0 is at 5,000 records 
read from the last commit and Task 1 is at 70,000 records read from the last 
commit. Then, boom, something goes wrong with Task 0 and it falls over. This 
triggers a rebalance and Task 1 has to take over the workload. Task 1 will, at 
this point, discard the 70,000 records in its buffer and start from the last 
commit point. This failure mode is brutal for the archival system we’re 
building.

There are two solutions that I can think of to this:

(A) Provide an interface for connectors to define their own rebalance listener. 
This listener could compare the newly assigned list of partitions with a 
previously assigned list. For all partitions that this connector was already 
working on prior to the rebalance, it could manually seek to the last position 
it locally processed before resuming. So, in the scenario above Task 1 could 
keep an accounting file locally and seek over the first 70,000 records without 
reprocessing them. It would then wait until after it confirms the S3 upload to 
commit those offsets back to Kafka. This ensures that if the machine running 
Task 1 dies a new consumer can take its place, but we’ll still benefit from a 
local cache if one is present.

(B) Have connect manually round robin partitions on a topic to tasks and never 
rebalance them automatically. If this were combined with better task retry 
semantics, I think this solution would be simpler.

(3) As far as I can tell, JMX metrics aren’t reporting the number of active 
tasks
This one is arguably the simplest issue to resolve, but we’d like to alert if 
the number of active tasks isn’t what we expect it to be so that we can have a 
human investigate.

-----------

I would love thoughts on all of the above from anyone on this list.

Thanks,

Matt Farmer

Reply via email to