[ 
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14401:
------------------------------
    Component/s: KafkaConnect

> Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-14401
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14401
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Sagar Rao
>            Priority: Major
>              Labels: KafkaConnect
>
> When a connector or task tries to read the offsets from the offsets topic, it 
> issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
> underneath KafkaBackingStore. KafkaBackingStore invokes 
> `KafkaBasedLog#readToEnd` method and passes the Callback. This method 
> essentially adds the Callback to a Queue of callbacks that are being managed.
> Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
> callback queue and executes them and it does this in an infinite loop. 
> However, there is an enclosing try/catch block around the while loop. If 
> there's an exception thrown which is not caught by any of the other catch 
> blocks, the control goes to the outermost catch block and the WorkThread is 
> terminated. However, the connectors/tasks are not aware of this and they 
> would keep submitting callbacks to KafkaBasedLog with nobody processing them. 
> This can be seen in the thread dumps as well:
>  
> {code:java}
> "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
> tid=0x00007f8d9c037000 nid=0x5d00 waiting on condition  [0x00007f8dc08cd000]
>    java.lang.Thread.State: WAITING (parking)
>     at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
>     - parking to wait for  <0x000000070345c9a8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>     at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
>     at 
> java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
>     at 
> org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
>  {code}
>  
> We need a mechanism to restart the WorkThread if it dies. This could be done 
> in the outermost catch block for example.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to