[
https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553415#comment-17553415
]
Qingsheng Ren commented on FLINK-27962:
---------------------------------------
Thanks for the discussion [~igaevd] [~martijnvisser] ! At first glance it looks
like a problem on broker side because {{CoordinatorNotAvailableException}}
should be an error responded by broker, but it's weird that the consumer hangs
after this error. It'll be great to have your JM and TM logs as suggested by
[~martijnvisser]. Also you can try to enable the trace level logging of Kafka
consumer to reveal more details about the root cause.
The reason other applications like Storm doesn't fail at the same time might be
that they are using different group IDs so their group coordinator could be on
different brokers. Just an assumption in my mind and we still need some logs to
investigate the issue.
> KafkaSourceReader fails to commit consumer offsets for checkpoints
> ------------------------------------------------------------------
>
> Key: FLINK-27962
> URL: https://issues.apache.org/jira/browse/FLINK-27962
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.15.0, 1.14.4
> Reporter: Dmytro
> Priority: Major
>
> The KafkaSourceReader works well for many hours, then fails and re-connects
> successfully, then continues to work some time. After the first three
> failures it hangs on "Offset commit failed" and never connected again.
> Restarting the Flink job does help and it works until the next "3 times fail".
> I am aware about [the
> note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing]
> that Kafka source does NOT rely on committed offsets for fault tolerance.
> Committing offset is only for exposing the progress of consumer and consuming
> group for monitoring.
> I agree if the failures are only periodic, but I would argue complete
> failures are unacceptable
> *Failed to commit consumer offsets for checkpoint:*
> {code:java}
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException:
> The coordinator is not available.
> 2022-06-06 14:19:52,297 WARN
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed
> to commit consumer offsets for checkpoint 464521
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset
> commit failed with a retriable exception. You should retry committing the
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException:
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed
> to commit consumer offsets for checkpoint 464522
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset
> commit failed with a retriable exception. You should retry committing the
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException:
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed
> to commit consumer offsets for checkpoint 464523
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset
> commit failed with a retriable exception. You should retry committing the
> latest consumed offsets
> ..... fails permanently until the job restart
> {code}
> *Consumer Config:*
> {code:java}
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.offset.reset = none
> bootstrap.servers = [test.host.net:9093]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = test-client-id
> client.rack =
> connections.max.idle.ms = 180000
> default.api.timeout.ms = 60000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = test-group-id
> group.instance.id = null
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_uncommitted
> key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 300000
> max.poll.records = 500
> metadata.max.age.ms = 180000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 60000
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = [hidden]
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = class
> com.test.kafka.security.AzureAuthenticateCallbackHandler
> sasl.login.class = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.mechanism = OAUTHBEARER
> security.protocol = SASL_SSL
> security.providers = null
> send.buffer.bytes = 131072
> session.timeout.ms = 30000
> socket.connection.setup.timeout.max.ms = 30000
> socket.connection.setup.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2]
> ssl.endpoint.identification.algorithm = https
> ssl.engine.factory.class = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.certificate.chain = null
> ssl.keystore.key = null
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLSv1.2
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.certificates = null
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)