[ 
https://issues.apache.org/jira/browse/NIFI-14160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17932313#comment-17932313
 ] 

Josef Zahner edited comment on NIFI-14160 at 3/4/25 1:19 PM:
-------------------------------------------------------------

In NiFi 2.2.0 it's still the same behavior as [~lintermans] mentioned. When we 
set the "Commit Offset" to {{{}false{}}}, the Kafka API value 
"{{{}enable.auto.commit{}}}" is always true.

In our case it's even worse, we have to adapt the "Max Uncommited Time", but 
even though we have a very high "{{{}max.poll.records{}}}" number (which seems 
to be correctly set based on the "ConsumerConfig" values) the flow files gets 
pushed out way too small, so it looks like the time change doesn't seem to have 
any effect. No change at all in the ConsumerConfig values if we change the "Max 
Uncommited Time". 

If there is no lag for the topic and the consumer group, ConsumeKafka shows the 
following DEBUG line, where "PT5S" seems to be the value from "Max Uncommited 
Time". So the time value is there, but it doesn't have any effect in our 
opinion...

 
{code:java}
2025-03-04 14:14:54,613 DEBUG [Timer-Driven Process Thread-3] 
o.a.nifi.kafka.processors.ConsumeKafka 
ConsumeKafka[id=198a4339-e523-32ac-bcea-287bf976b9f3] No Kafka Records 
consumed: Group ID [lmygroup.id] Topics [my.topic] Topic Pattern [null] Auto 
Offset Reset [LATEST] Max Uncommitted Time [PT5S]{code}
So at the end the performance is very bad as we have way too small flowfiles.

 


was (Author: jzahner):
In NiFi 2.2.0 it's still the same behavior as [~lintermans] mentioned. When we 
set the "Commit Offset" to {{{}false{}}}, the Kafka API value 
"{{{}enable.auto.commit{}}}" is always true.

In our case it's even worse, we have to adapt the "Max Uncommited Time", but 
even though we have a very high "{{{}max.poll.records{}}}" number (which seems 
to be correctly set based on the "ConsumerConfig" values) the flow files gets 
pushed out way too small, so it looks like the time change doesn't seem to have 
any effect. No change at all in the ConsumerConfig values if we change the "Max 
Uncommited Time". 

If there is no lag for the topic and the consumer group, ConsumeKafka shows the 
following DEBUG line, where "PT5S" seems to be the value from "Max Uncommited 
Time". So the time value is there, but it doesn't have any effect in our 
opinion...

 
{code:java}
2025-03-04 14:14:54,613 DEBUG [Timer-Driven Process Thread-3] 
o.a.nifi.kafka.processors.ConsumeKafka 
ConsumeKafka[id=198a4339-e523-32ac-bcea-287bf976b9f3] No Kafka Records 
consumed: Group ID [lidr.prod.nifi3.testseppi.lidr_ufih] Topics 
[mediationzone.prod.lidr-ufih] Topic Pattern [null] Auto Offset Reset [LATEST] 
Max Uncommitted Time [PT5S]{code}
So at the end the performance is very bad as we have way too small flowfiles.

 

> ConsumeKafka processor => Commit Offsets set to false, offsets are committed 
> regardless
> ---------------------------------------------------------------------------------------
>
>                 Key: NIFI-14160
>                 URL: https://issues.apache.org/jira/browse/NIFI-14160
>             Project: Apache NiFi
>          Issue Type: Bug
>    Affects Versions: 2.1.0
>            Reporter: christofe lintermans
>            Priority: Major
>
> I want to achieve exactly-once semantic Kafka.
> The consumer does not commit the offset. Later in the flow, when the 
> processing is correct, we send the message again to PublishKafka processor, 
> based on the attributes generated from the ConsumeKafka processor, it should 
> commit the offsets.
>  
> But when I schedule the ConsumeKafka processor, I notice that the property 
> Commit Offsets is not taken in consideration, and the offsets are committed 
> by the consumer.
> INFO [Timer-Driven Process Thread-10] o.a.k.clients.consumer.ConsumerConfig 
> ConsumerConfig values:
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.include.jmx.reporter = true
> auto.offset.reset = latest
> bootstrap.servers = [broker-1:19092, broker-2:19092, broker-3:19092]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = consumer-Consumer-19
> client.rack =
> connections.max.idle.ms = 540000
> default.api.timeout.ms = 60000
> *enable.auto.commit = true*
> enable.metrics.push = true
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = Consumer
> group.instance.id = null
> group.protocol = classic
> group.remote.assignor = null
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_committed
> key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 300000
> max.poll.records = 10000
> metadata.max.age.ms = 300000
> metadata.recovery.strategy = none
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor, class 
> org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 30000
> retry.backoff.max.ms = 1000
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = null
> sasl.login.class = null
> sasl.login.connect.timeout.ms = null
> sasl.login.read.timeout.ms = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.login.retry.backoff.max.ms = 10000
> sasl.login.retry.backoff.ms = 100
> sasl.mechanism = GSSAPI
> sasl.oauthbearer.clock.skew.seconds = 30
> sasl.oauthbearer.expected.audience = null
> sasl.oauthbearer.expected.issuer = null
> sasl.oauthbearer.header.urlencode = false
> sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
> sasl.oauthbearer.jwks.endpoint.url = null
> sasl.oauthbearer.scope.claim.name = scope
> sasl.oauthbearer.sub.claim.name = sub
> sasl.oauthbearer.token.endpoint.url = null
> security.protocol = PLAINTEXT
> security.providers = null
> send.buffer.bytes = 131072
> session.timeout.ms = 45000
> socket.connection.setup.timeout.max.ms = 30000
> socket.connection.setup.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
> ssl.endpoint.identification.algorithm = https
> ssl.engine.factory.class = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.certificate.chain = null
> ssl.keystore.key = null
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLSv1.3
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.certificates = null
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to