[ 
https://issues.apache.org/jira/browse/NIFI-14160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18013764#comment-18013764
 ] 

Paul Grey commented on NIFI-14160:
----------------------------------

Hi.

The behavior noted by [~jzahner] can probably be explained here [1].  It seems 
this was added in NIFI-14598 to work around an issue with management of auto 
commit offsets behavior.  Inferring from the Kafka documentation [2], there is 
a Kafka client background thread that periodically commits offsets, and that 
conflicts with NiFi management of the offsets.  The intended effect (I think) 
is that only NiFi should (explicitly) manage offsets of partitions where it is 
a consumer.

I don't think there is a linkage between the NiFi processor "Commit Offsets" 
setting [3] (which controls synchronous NiFi behavior) and "enable.auto.commit" 
(which controls asynchronous Kafka behavior).  That linkage may have existed in 
the past...

There has been significant ongoing refinement work on the Kafka processors in 
the 2.x line.  Please call out any functional misbehavior you observe in the 
current NAR.  It is a goal of the project to have NiFi and all of its 
components undergo continuous improvement, and users really help focus 
attention on problem areas.

Cheers!


[1] 
https://github.com/apache/nifi/blob/e45d5d7d6fd0b31af3ac6ecbca51adfa0555a7d4/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java#L236

[2] 
https://github.com/apache/kafka/blob/4.0/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L136-L140

[3] 
https://github.com/apache/nifi/blob/52bffaddf84add6e420581fa9ee641d5efdaa983/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java#L154-L162

> ConsumeKafka processor => Commit Offsets set to false, offsets are committed 
> regardless
> ---------------------------------------------------------------------------------------
>
>                 Key: NIFI-14160
>                 URL: https://issues.apache.org/jira/browse/NIFI-14160
>             Project: Apache NiFi
>          Issue Type: Bug
>    Affects Versions: 2.1.0
>            Reporter: christofe lintermans
>            Priority: Major
>
> I want to achieve exactly-once semantic Kafka.
> The consumer does not commit the offset. Later in the flow, when the 
> processing is correct, we send the message again to PublishKafka processor, 
> based on the attributes generated from the ConsumeKafka processor, it should 
> commit the offsets.
>  
> But when I schedule the ConsumeKafka processor, I notice that the property 
> Commit Offsets is not taken in consideration, and the offsets are committed 
> by the consumer.
> INFO [Timer-Driven Process Thread-10] o.a.k.clients.consumer.ConsumerConfig 
> ConsumerConfig values:
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.include.jmx.reporter = true
> auto.offset.reset = latest
> bootstrap.servers = [broker-1:19092, broker-2:19092, broker-3:19092]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = consumer-Consumer-19
> client.rack =
> connections.max.idle.ms = 540000
> default.api.timeout.ms = 60000
> *enable.auto.commit = true*
> enable.metrics.push = true
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = Consumer
> group.instance.id = null
> group.protocol = classic
> group.remote.assignor = null
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_committed
> key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 300000
> max.poll.records = 10000
> metadata.max.age.ms = 300000
> metadata.recovery.strategy = none
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor, class 
> org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 30000
> retry.backoff.max.ms = 1000
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = null
> sasl.login.class = null
> sasl.login.connect.timeout.ms = null
> sasl.login.read.timeout.ms = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.login.retry.backoff.max.ms = 10000
> sasl.login.retry.backoff.ms = 100
> sasl.mechanism = GSSAPI
> sasl.oauthbearer.clock.skew.seconds = 30
> sasl.oauthbearer.expected.audience = null
> sasl.oauthbearer.expected.issuer = null
> sasl.oauthbearer.header.urlencode = false
> sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
> sasl.oauthbearer.jwks.endpoint.url = null
> sasl.oauthbearer.scope.claim.name = scope
> sasl.oauthbearer.sub.claim.name = sub
> sasl.oauthbearer.token.endpoint.url = null
> security.protocol = PLAINTEXT
> security.providers = null
> send.buffer.bytes = 131072
> session.timeout.ms = 45000
> socket.connection.setup.timeout.max.ms = 30000
> socket.connection.setup.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
> ssl.endpoint.identification.algorithm = https
> ssl.engine.factory.class = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.certificate.chain = null
> ssl.keystore.key = null
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLSv1.3
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.certificates = null
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to