[
https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17554240#comment-17554240
]
Dmytro commented on FLINK-27962:
--------------------------------
Hi [~martijnvisser] , [~renqs]! Just FYI, the logs have been attached.
> KafkaSourceReader fails to commit consumer offsets for checkpoints
> ------------------------------------------------------------------
>
> Key: FLINK-27962
> URL: https://issues.apache.org/jira/browse/FLINK-27962
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.15.0, 1.14.4
> Reporter: Dmytro
> Priority: Major
> Attachments: logs.zip
>
>
> The KafkaSourceReader works well for many hours, then fails and re-connects
> successfully, then continues to work some time. After the first three
> failures it hangs on "Offset commit failed" and never connected again.
> Restarting the Flink job does help and it works until the next "3 times fail".
> I am aware about [the
> note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing]
> that Kafka source does NOT rely on committed offsets for fault tolerance.
> Committing offset is only for exposing the progress of consumer and consuming
> group for monitoring.
> I agree if the failures are only periodic, but I would argue complete
> failures are unacceptable
> *Failed to commit consumer offsets for checkpoint:*
> {code:java}
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException:
> The coordinator is not available.
> 2022-06-06 14:19:52,297 WARN
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed
> to commit consumer offsets for checkpoint 464521
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset
> commit failed with a retriable exception. You should retry committing the
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException:
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed
> to commit consumer offsets for checkpoint 464522
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset
> commit failed with a retriable exception. You should retry committing the
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException:
> The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed
> to commit consumer offsets for checkpoint 464523
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset
> commit failed with a retriable exception. You should retry committing the
> latest consumed offsets
> ..... fails permanently until the job restart
> {code}
> *Consumer Config:*
> {code:java}
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.offset.reset = none
> bootstrap.servers = [test.host.net:9093]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = test-client-id
> client.rack =
> connections.max.idle.ms = 180000
> default.api.timeout.ms = 60000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = test-group-id
> group.instance.id = null
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_uncommitted
> key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 300000
> max.poll.records = 500
> metadata.max.age.ms = 180000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 60000
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = [hidden]
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = class
> com.test.kafka.security.AzureAuthenticateCallbackHandler
> sasl.login.class = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.mechanism = OAUTHBEARER
> security.protocol = SASL_SSL
> security.providers = null
> send.buffer.bytes = 131072
> session.timeout.ms = 30000
> socket.connection.setup.timeout.max.ms = 30000
> socket.connection.setup.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2]
> ssl.endpoint.identification.algorithm = https
> ssl.engine.factory.class = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.certificate.chain = null
> ssl.keystore.key = null
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLSv1.2
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.certificates = null
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)