[
https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17552607#comment-17552607
]
Martijn Visser commented on FLINK-27962:
----------------------------------------
[~igaevd] More than happy to (try to) help :)
In most situations that I've seen, these issues are related to setup. It could
be Kafka issues (given FLINK-27963, perhaps Kafka is actively blocking the
Flink job at that moment), the network issues, JVM issues, pod issues, there
are so many things that can cause these type of errors. Before making the
conclusion that there's a bug on a Flink side, it requires more investigation.
It's understandable that you're pointing to Flink, my counter argument would be
that if this was a real bug in Flink, we would have had a lot more bug reports
from users who are using this.
Do you have the Jobmanager and Taskmanager logs? Perhaps we can see something
else that helps us understand what's going on and pinpoint the issue (it could
still be a bug of course)
> KafkaSourceReader fails to commit consumer offsets for checkpoints
> ------------------------------------------------------------------
>
> Key: FLINK-27962
> URL: https://issues.apache.org/jira/browse/FLINK-27962
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.15.0, 1.14.4
> Reporter: Dmytro
> Priority: Major
>
> The KafkaSourceReader works well for many hours, then fails and re-connects
> successfully, then continues to work some time. After the first three
> failures it hangs on "Offset commit failed" and never connected again.
> Restarting the Flink job does help and it works until the next "3 times fail".
> I am aware about [the
> note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing]
> that Kafka source does NOT rely on committed offsets for fault tolerance.
> Committing offset is only for exposing the progress of consumer and consuming
> group for monitoring.
> I agree if the failures are only periodic, but I would argue complete
> failures are unacceptable
> *Failed to commit consumer offsets for checkpoint:*
> {code:java}
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException:
> The coordinator is not available.
> 2022-06-06 14:19:52,297 WARN
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed
> to commit consumer offsets for checkpoint 464521
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset
> commit failed with a retriable exception. You should retry committing the
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException:
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed
> to commit consumer offsets for checkpoint 464522
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset
> commit failed with a retriable exception. You should retry committing the
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException:
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed
> to commit consumer offsets for checkpoint 464523
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset
> commit failed with a retriable exception. You should retry committing the
> latest consumed offsets
> ..... fails permanently until the job restart
> {code}
> *Consumer Config:*
> {code:java}
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.offset.reset = none
> bootstrap.servers = [test.host.net:9093]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = test-client-id
> client.rack =
> connections.max.idle.ms = 180000
> default.api.timeout.ms = 60000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = test-group-id
> group.instance.id = null
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_uncommitted
> key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 300000
> max.poll.records = 500
> metadata.max.age.ms = 180000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 60000
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = [hidden]
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = class
> com.test.kafka.security.AzureAuthenticateCallbackHandler
> sasl.login.class = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.mechanism = OAUTHBEARER
> security.protocol = SASL_SSL
> security.providers = null
> send.buffer.bytes = 131072
> session.timeout.ms = 30000
> socket.connection.setup.timeout.max.ms = 30000
> socket.connection.setup.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2]
> ssl.endpoint.identification.algorithm = https
> ssl.engine.factory.class = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.certificate.chain = null
> ssl.keystore.key = null
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLSv1.2
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.certificates = null
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)