Dmytro created FLINK-27962:
------------------------------
Summary: KafkaSourceReader fails to commit consumer offsets for
checkpoints
Key: FLINK-27962
URL: https://issues.apache.org/jira/browse/FLINK-27962
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.14.4, 1.15.0
Reporter: Dmytro
The KafkaSourceReader works well for many hours, then fails and re-connects
successfully, then continues to work some time. After the first three failures
it hangs on "Offset commit failed" and never connected again. Restarting the
Flink job does help and it works until the next "3 times fail".
*Failed to commit consumer offsets for checkpoint:*
{code:java}
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The
coordinator is not available.
2022-06-06 14:19:52,297 WARN
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to
commit consumer offsets for checkpoint 464521
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit
failed with a retriable exception. You should retry committing the latest
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The
coordinator is not available.
2022-06-06 14:20:02,297 WARN
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to
commit consumer offsets for checkpoint 464522
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit
failed with a retriable exception. You should retry committing the latest
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The
coordinator is not available.
2022-06-06 14:20:02,297 WARN
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to
commit consumer offsets for checkpoint 464523
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit
failed with a retriable exception. You should retry committing the latest
consumed offsets
..... fails permanently until the job restart
{code}
*Consumer Config:*
{code:java}
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [test.host.net:9093]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = test-client-id
client.rack =
connections.max.idle.ms = 180000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test-group-id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 180000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 60000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = class
com.test.kafka.security.AzureAuthenticateCallbackHandler
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = OAUTHBEARER
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 30000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)