[
https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jason Gustafson resolved KAFKA-7280.
------------------------------------
Resolution: Fixed
Fix Version/s: (was: 1.1.2)
> ConcurrentModificationException in FetchSessionHandler in heartbeat thread
> --------------------------------------------------------------------------
>
> Key: KAFKA-7280
> URL: https://issues.apache.org/jira/browse/KAFKA-7280
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 1.1.1, 2.0.0
> Reporter: Rajini Sivaram
> Assignee: Rajini Sivaram
> Priority: Critical
> Fix For: 2.0.1, 2.1.0
>
>
> Request/response handling in FetchSessionHandler is not thread-safe. But we
> are using it in Kafka consumer without any synchronization even though poll()
> from heartbeat thread can process responses. Heartbeat thread holds the
> coordinator lock while processing its poll and responses, making other
> operations involving the group coordinator safe. We also need to lock
> FetchSessionHandler for the operations that update or read
> FetchSessionHandler#sessionPartitions.
> This exception is from a system test run on trunk of
> TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two:
> {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer,
> groupId=group] Heartbeat thread failed due to unexpected error
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> java.util.ConcurrentModificationException
> at
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
> at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
> at
> org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362)
> at
> org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996)
> {quote}
>
> The logs just prior to the exception show that a partition was removed from
> the session:
> {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer,
> groupId=group] Skipping fetch for partition test_topic-1 because there is an
> in-flight request to worker4:9095 (id: 3 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer,
> groupId=group] Completed receive from node 2 for FETCH with correlation id
> 417, received
> {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header=
> Unknown macro:
> \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null}
> ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0
> bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient)
> [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer,
> groupId=group] Added READ_UNCOMMITTED fetch request for partition
> test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer,
> groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for
> node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0)
> (org.apache.kafka.clients.FetchSessionHandler)
> [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer,
> groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(),
> toForget=(test_topic-2), implied=(test_topic-0)) to broker worker3:9095 (id:
> 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer,
> groupId=group] Sending FETCH
> {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=109800960,epoch=237,topics=[],forgotten_topics_data=[
> Unknown macro: \{topic=test_topic,partitions=[2]}
> ]} with correlation id 418 to node 2 (org.apache.kafka.clients.NetworkClient)
> [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer,
> groupId=group] Skipping fetch for partition test_topic-2 because there is an
> in-flight request to worker3:9095 (id: 2 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> [2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer,
> groupId=group] Heartbeat thread failed due to unexpected error
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> java.util.ConcurrentModificationException
> {quote}
> The sequence in the logs show
> # FETCH response received
> # FetchSessionHandler#sessionPartitions is updated (a partition is removed)
> # New FETCH request is sent
> # Heartbeat thread throws ConcurrentModificationException while iterating
> over FetchSessionHandler#sessionPartitions
> This could be because 1) and 4) were on the heartbeat thread and 2) and 3) on
> the thread processing Consumer#poll().
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)