[ https://issues.apache.org/jira/browse/KAFKA-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271380#comment-16271380 ]
ASF GitHub Bot commented on KAFKA-6260: --------------------------------------- GitHub user hachikuji opened a pull request: https://github.com/apache/kafka/pull/4276 KAFKA-6260: Ensure selection keys are removed from all collections on socket close When a socket is closed, we must remove corresponding selection keys from internal collections. This fixes an NPE which is caused by attempting to access the selection key's attached channel after it had been cleared after disconnecting. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/hachikuji/kafka KAFKA-6260 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4276.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4276 ---- commit e715e673b7bca14e2a26a998348528d27ac8a9c8 Author: Jason Gustafson <ja...@confluent.io> Date: 2017-11-29T19:10:39Z KAFKA-6260: Ensure selection keys are removed from all collections on socket close ---- > AbstractCoordinator not clearly handles NULL Exception > ------------------------------------------------------ > > Key: KAFKA-6260 > URL: https://issues.apache.org/jira/browse/KAFKA-6260 > Project: Kafka > Issue Type: Bug > Affects Versions: 1.0.0 > Environment: RedHat Linux > Reporter: Seweryn Habdank-Wojewodzki > Assignee: Jason Gustafson > > The error reporting is not clear. But it seems that Kafka Heartbeat shuts > down application due to NULL exception caused by "fake" disconnections. > One more comment. We are processing messages in the stream, but sometimes we > have to block processing for minutes, as consumers are not handling too much > load. Is it possibble that when stream is waiting, then heartbeat is as well > blocked? > Can you check that? > {code} > 2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending Heartbeat request to coordinator > cljp01.eb.lan.at:9093 (id: 2147483646 rack: null) > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending HEARTBEAT > {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08} > with correlation id 24 to node 2147483646 > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT > with correlation id 24, received {throttle_time_ms=0,error_code=0} > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout. > 2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled request > {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]} > with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(apiKey=FETCH, > apiVersion=6, > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > correlationId=21) with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG Fetcher:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Fetch request > {clj_internal_topic-6=(offset=211558395, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-8=(offset=210178209, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-0=(offset=209353523, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-2=(offset=209291462, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-4=(offset=210728595, logStartOffset=-1, > maxBytes=1048576)} to cljp01.eb.lan.at:9093 (id: 1 rack: DC-1) failed > org.apache.kafka.common.errors.DisconnectException: null > 2017-11-23 23:54:52 TRACE NetworkClient:123 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Found least loaded node cljp01.eb.lan.at:9093 (id: 1 > rack: DC-1) > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Initialize connection to node cljp01.eb.lan.at:9093 > (id: 1 rack: DC-1) for sending metadata request > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Initiating connection to node cljp01.eb.lan.at:9093 > (id: 1 rack: DC-1) > 2017-11-23 23:54:52 ERROR AbstractCoordinator:296 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Heartbeat thread for group kafka-endpoint failed due > to unexpected error > java.lang.NullPointerException: null > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:436) > ~[my-kafka-endpoint.jar:?] > at org.apache.kafka.common.network.Selector.poll(Selector.java:395) > ~[my-kafka-endpoint.jar:?] > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) > ~[my-kafka-endpoint.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) > ~[my-kafka-endpoint.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:275) > ~[my-kafka-endpoint.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:934) > [my-kafka-endpoint.jar:?] > 2017-11-23 23:54:52 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Heartbeat thread has closed > 2017-11-23 23:55:16 INFO KafkaElasticsearchEndpoint:61 - Got shutdown > requests ... > 2017-11-23 23:55:16 INFO StreamProcessor:106 - Streams closing ... > 2017-11-23 23:55:16 INFO StreamProcessor:111 - with 5 [s] timeout > 2017-11-23 23:55:16 DEBUG KafkaStreams:183 - stream-client > [kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31]Stopping Streams client > with timeoutMillis = 5000 ms. > 2017-11-23 23:55:16 INFO KafkaStreams:346 - stream-client > [kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31]State transition from > RUNNING to PENDING_SHUTDOWN > 2017-11-23 23:55:16 INFO StreamThread:336 - stream-thread > [kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1] Informed > to shut down > 2017-11-23 23:55:16 INFO StreamThread:346 - stream-thread > [kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1] State > transition from RUNNING to PENDING_SHUTDOWN > {code} > Kafka settings: > {code} > auto.commit.interval.ms = 5000 > auto.offset.reset = earliest > bootstrap.servers = [xxx:9093, yyy:9093] > check.crcs = true > client.id = > kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer > connections.max.idle.ms = 540000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 6000 > fetch.min.bytes = 1 > group.id = kafka-endpoint > heartbeat.interval.ms = 3000 > interceptor.classes = null > internal.leave.group.on.close = false > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 10000000 > max.poll.records = 2000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = > [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 7000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = SSL > send.buffer.bytes = 131072 > session.timeout.ms = 6000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = [hidden] > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = /data/my/etc/kafka/client-keystore > ssl.keystore.password = [hidden] > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = SHA1PRNG > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = /data/my/etc/kafka/truststore > ssl.truststore.password = [hidden] > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)