christofe lintermans created NIFI-14160:
-------------------------------------------

             Summary: ConsumeKafka processor => Commit Offsets set to false, 
offsets are committed regardless
                 Key: NIFI-14160
                 URL: https://issues.apache.org/jira/browse/NIFI-14160
             Project: Apache NiFi
          Issue Type: Bug
    Affects Versions: 2.1.0
            Reporter: christofe lintermans


I want to achieve exactly-once semantic Kafka.

The consumer does not commit the offset. Later in the flow, when the processing 
is correct, we send the message again to PublishKafka processor, based on the 
attributes generated from the ConsumeKafka processor, it should commit the 
offsets.

 

But when I schedule the ConsumeKafka processor, I notice that the property 
Commit Offsets is not taken in consideration, and the offsets are committed by 
the consumer.

INFO [Timer-Driven Process Thread-10] o.a.k.clients.consumer.ConsumerConfig 
ConsumerConfig values:

allow.auto.create.topics = true

auto.commit.interval.ms = 5000

auto.include.jmx.reporter = true

auto.offset.reset = latest

bootstrap.servers = [broker-1:19092, broker-2:19092, broker-3:19092]

check.crcs = true

client.dns.lookup = use_all_dns_ips

client.id = consumer-Consumer-19

client.rack =

connections.max.idle.ms = 540000

default.api.timeout.ms = 60000

*enable.auto.commit = true*

enable.metrics.push = true

exclude.internal.topics = true

fetch.max.bytes = 52428800

fetch.max.wait.ms = 500

fetch.min.bytes = 1

group.id = Consumer

group.instance.id = null

group.protocol = classic

group.remote.assignor = null

heartbeat.interval.ms = 3000

interceptor.classes = []

internal.leave.group.on.close = true

internal.throw.on.fetch.stable.offset.unsupported = false

isolation.level = read_committed

key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

max.partition.fetch.bytes = 1048576

max.poll.interval.ms = 300000

max.poll.records = 10000

metadata.max.age.ms = 300000

metadata.recovery.strategy = none

metric.reporters = []

metrics.num.samples = 2

metrics.recording.level = INFO

metrics.sample.window.ms = 30000

partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor, class 
org.apache.kafka.clients.consumer.CooperativeStickyAssignor]

receive.buffer.bytes = 65536

reconnect.backoff.max.ms = 1000

reconnect.backoff.ms = 50

request.timeout.ms = 30000

retry.backoff.max.ms = 1000

retry.backoff.ms = 100

sasl.client.callback.handler.class = null

sasl.jaas.config = null

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.min.time.before.relogin = 60000

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

sasl.kerberos.ticket.renew.window.factor = 0.8

sasl.login.callback.handler.class = null

sasl.login.class = null

sasl.login.connect.timeout.ms = null

sasl.login.read.timeout.ms = null

sasl.login.refresh.buffer.seconds = 300

sasl.login.refresh.min.period.seconds = 60

sasl.login.refresh.window.factor = 0.8

sasl.login.refresh.window.jitter = 0.05

sasl.login.retry.backoff.max.ms = 10000

sasl.login.retry.backoff.ms = 100

sasl.mechanism = GSSAPI

sasl.oauthbearer.clock.skew.seconds = 30

sasl.oauthbearer.expected.audience = null

sasl.oauthbearer.expected.issuer = null

sasl.oauthbearer.header.urlencode = false

sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000

sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000

sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100

sasl.oauthbearer.jwks.endpoint.url = null

sasl.oauthbearer.scope.claim.name = scope

sasl.oauthbearer.sub.claim.name = sub

sasl.oauthbearer.token.endpoint.url = null

security.protocol = PLAINTEXT

security.providers = null

send.buffer.bytes = 131072

session.timeout.ms = 45000

socket.connection.setup.timeout.max.ms = 30000

socket.connection.setup.timeout.ms = 10000

ssl.cipher.suites = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.3]

ssl.endpoint.identification.algorithm = https

ssl.engine.factory.class = null

ssl.key.password = null

ssl.keymanager.algorithm = SunX509

ssl.keystore.certificate.chain = null

ssl.keystore.key = null

ssl.keystore.location = null

ssl.keystore.password = null

ssl.keystore.type = JKS

ssl.protocol = TLSv1.3

ssl.provider = null

ssl.secure.random.implementation = null

ssl.trustmanager.algorithm = PKIX

ssl.truststore.certificates = null

ssl.truststore.location = null

ssl.truststore.password = null

ssl.truststore.type = JKS

value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to