[ 
https://issues.apache.org/jira/browse/NIFI-14160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18011123#comment-18011123
 ] 

Josef Zahner commented on NIFI-14160:
-------------------------------------

We finally managed to solve the issue that the records are pushed out too early 
- doesn't make sense to mention the details here as it was related to something 
else.

However, with NiFi 2.5.0, even if we set the "Commit Offset" to true, the 
parameter "enable.auto.commit = false" is now {*}always false{*}. So the 
opposite of what it was with NiFi 2.1.0. I don't get it.

> ConsumeKafka processor => Commit Offsets set to false, offsets are committed 
> regardless
> ---------------------------------------------------------------------------------------
>
>                 Key: NIFI-14160
>                 URL: https://issues.apache.org/jira/browse/NIFI-14160
>             Project: Apache NiFi
>          Issue Type: Bug
>    Affects Versions: 2.1.0
>            Reporter: christofe lintermans
>            Priority: Major
>
> I want to achieve exactly-once semantic Kafka.
> The consumer does not commit the offset. Later in the flow, when the 
> processing is correct, we send the message again to PublishKafka processor, 
> based on the attributes generated from the ConsumeKafka processor, it should 
> commit the offsets.
>  
> But when I schedule the ConsumeKafka processor, I notice that the property 
> Commit Offsets is not taken in consideration, and the offsets are committed 
> by the consumer.
> INFO [Timer-Driven Process Thread-10] o.a.k.clients.consumer.ConsumerConfig 
> ConsumerConfig values:
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.include.jmx.reporter = true
> auto.offset.reset = latest
> bootstrap.servers = [broker-1:19092, broker-2:19092, broker-3:19092]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = consumer-Consumer-19
> client.rack =
> connections.max.idle.ms = 540000
> default.api.timeout.ms = 60000
> *enable.auto.commit = true*
> enable.metrics.push = true
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = Consumer
> group.instance.id = null
> group.protocol = classic
> group.remote.assignor = null
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_committed
> key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 300000
> max.poll.records = 10000
> metadata.max.age.ms = 300000
> metadata.recovery.strategy = none
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor, class 
> org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 30000
> retry.backoff.max.ms = 1000
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = null
> sasl.login.class = null
> sasl.login.connect.timeout.ms = null
> sasl.login.read.timeout.ms = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.login.retry.backoff.max.ms = 10000
> sasl.login.retry.backoff.ms = 100
> sasl.mechanism = GSSAPI
> sasl.oauthbearer.clock.skew.seconds = 30
> sasl.oauthbearer.expected.audience = null
> sasl.oauthbearer.expected.issuer = null
> sasl.oauthbearer.header.urlencode = false
> sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
> sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
> sasl.oauthbearer.jwks.endpoint.url = null
> sasl.oauthbearer.scope.claim.name = scope
> sasl.oauthbearer.sub.claim.name = sub
> sasl.oauthbearer.token.endpoint.url = null
> security.protocol = PLAINTEXT
> security.providers = null
> send.buffer.bytes = 131072
> session.timeout.ms = 45000
> socket.connection.setup.timeout.max.ms = 30000
> socket.connection.setup.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
> ssl.endpoint.identification.algorithm = https
> ssl.engine.factory.class = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.certificate.chain = null
> ssl.keystore.key = null
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLSv1.3
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.certificates = null
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to