[
https://issues.apache.org/jira/browse/NIFI-14160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17932313#comment-17932313
]
Josef Zahner edited comment on NIFI-14160 at 3/4/25 1:19 PM:
-------------------------------------------------------------
In NiFi 2.2.0 it's still the same behavior as [~lintermans] mentioned. When we
set the "Commit Offset" to {{{}false{}}}, the Kafka API value
"{{{}enable.auto.commit{}}}" is always true.
In our case it's even worse, we have to adapt the "Max Uncommited Time", but
even though we have a very high "{{{}max.poll.records{}}}" number (which seems
to be correctly set based on the "ConsumerConfig" values) the flow files gets
pushed out way too small, so it looks like the time change doesn't seem to have
any effect. No change at all in the ConsumerConfig values if we change the "Max
Uncommited Time".
If there is no lag for the topic and the consumer group, ConsumeKafka shows the
following DEBUG line, where "PT5S" seems to be the value from "Max Uncommited
Time". So the time value is there, but it doesn't have any effect in our
opinion...
{code:java}
2025-03-04 14:14:54,613 DEBUG [Timer-Driven Process Thread-3]
o.a.nifi.kafka.processors.ConsumeKafka
ConsumeKafka[id=198a4339-e523-32ac-bcea-287bf976b9f3] No Kafka Records
consumed: Group ID [lmygroup.id] Topics [my.topic] Topic Pattern [null] Auto
Offset Reset [LATEST] Max Uncommitted Time [PT5S]{code}
So at the end the performance is very bad as we have way too small flowfiles.
was (Author: jzahner):
In NiFi 2.2.0 it's still the same behavior as [~lintermans] mentioned. When we
set the "Commit Offset" to {{{}false{}}}, the Kafka API value
"{{{}enable.auto.commit{}}}" is always true.
In our case it's even worse, we have to adapt the "Max Uncommited Time", but
even though we have a very high "{{{}max.poll.records{}}}" number (which seems
to be correctly set based on the "ConsumerConfig" values) the flow files gets
pushed out way too small, so it looks like the time change doesn't seem to have
any effect. No change at all in the ConsumerConfig values if we change the "Max
Uncommited Time".
If there is no lag for the topic and the consumer group, ConsumeKafka shows the
following DEBUG line, where "PT5S" seems to be the value from "Max Uncommited
Time". So the time value is there, but it doesn't have any effect in our
opinion...
{code:java}
2025-03-04 14:14:54,613 DEBUG [Timer-Driven Process Thread-3]
o.a.nifi.kafka.processors.ConsumeKafka
ConsumeKafka[id=198a4339-e523-32ac-bcea-287bf976b9f3] No Kafka Records
consumed: Group ID [lidr.prod.nifi3.testseppi.lidr_ufih] Topics
[mediationzone.prod.lidr-ufih] Topic Pattern [null] Auto Offset Reset [LATEST]
Max Uncommitted Time [PT5S]{code}
So at the end the performance is very bad as we have way too small flowfiles.
> ConsumeKafka processor => Commit Offsets set to false, offsets are committed
> regardless
> ---------------------------------------------------------------------------------------
>
> Key: NIFI-14160
> URL: https://issues.apache.org/jira/browse/NIFI-14160
> Project: Apache NiFi
> Issue Type: Bug
> Affects Versions: 2.1.0
> Reporter: christofe lintermans
> Priority: Major
>
> I want to achieve exactly-once semantic Kafka.
> The consumer does not commit the offset. Later in the flow, when the
> processing is correct, we send the message again to PublishKafka processor,
> based on the attributes generated from the ConsumeKafka processor, it should
> commit the offsets.
>
> But when I schedule the ConsumeKafka processor, I notice that the property
> Commit Offsets is not taken in consideration, and the offsets are committed
> by the consumer.
> INFO [Timer-Driven Process Thread-10] o.a.k.clients.consumer.ConsumerConfig
> ConsumerConfig values:
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.include.jmx.reporter = true
> auto.offset.reset = latest
> bootstrap.servers = [broker-1:19092, broker-2:19092, broker-3:19092]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = consumer-Consumer-19
> client.rack =
> connections.max.idle.ms = 540000
> default.api.timeout.ms = 60000
> *enable.auto.commit = true*
> enable.metrics.push = true
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = Consumer
> group.instance.id = null
> group.protocol = classic
> group.remote.assignor = null
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_committed
> key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 300000
> max.poll.records = 10000
> metadata.max.age.ms = 300000
> metadata.recovery.strategy = none
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor, class
> org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 30000
> retry.backoff.max.ms = 1000
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = null
> sasl.login.class = null
> sasl.login.connect.timeout.ms = null
> sasl.login.read.timeout.ms = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.login.retry.backoff.max.ms = 10000
> sasl.login.retry.backoff.ms = 100
> sasl.mechanism = GSSAPI
> sasl.oauthbearer.clock.skew.seconds = 30
> sasl.oauthbearer.expected.audience = null
> sasl.oauthbearer.expected.issuer = null
> sasl.oauthbearer.header.urlencode = false
> sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
> sasl.oauthbearer.jwks.endpoint.url = null
> sasl.oauthbearer.scope.claim.name = scope
> sasl.oauthbearer.sub.claim.name = sub
> sasl.oauthbearer.token.endpoint.url = null
> security.protocol = PLAINTEXT
> security.providers = null
> send.buffer.bytes = 131072
> session.timeout.ms = 45000
> socket.connection.setup.timeout.max.ms = 30000
> socket.connection.setup.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
> ssl.endpoint.identification.algorithm = https
> ssl.engine.factory.class = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.certificate.chain = null
> ssl.keystore.key = null
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLSv1.3
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.certificates = null
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
--
This message was sent by Atlassian Jira
(v8.20.10#820010)