prabhu biradar created KAFKA-12279:
--------------------------------------
Summary: Kafka 2.7 stream app issue
Key: KAFKA-12279
URL: https://issues.apache.org/jira/browse/KAFKA-12279
Project: Kafka
Issue Type: Bug
Reporter: prabhu biradar
Fix For: 2.7.0
After starting the stream application below exception is thrown and threads
stop processing.
2021-02-02T22:29:10.416::AccountId ::Partition ::Offset
::[com.xx.xx.xx.xx.xx-client-StreamThread-1]::ERROR::o.a.k.s.processor.internals.StreamThread::stream-thread
[com.xx.xx.xx.xx.xx-client-StreamThread-1] Encountered the following exception
during processing and the thread is going to shut down:
2021-02-02T22:29:10.416::AccountId ::Partition ::Offset
::[com.xx.xx.xx.xx.xx-client-StreamThread-1]::ERROR::o.a.k.s.processor.internals.StreamThread::stream-thread
[com.xx.xx.xx.xx.xx-client-StreamThread-1] Encountered the following exception
during processing and the thread is going to shut down:
java.lang.IllegalStateException: Tried to lookup lag for unknown task 0_13 at
org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:318)
at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511)
at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) at
java.util.TreeMap.put(TreeMap.java:552) at
java.util.TreeSet.add(TreeSet.java:255) at
java.util.AbstractCollection.addAll(AbstractCollection.java:344) at
java.util.TreeSet.addAll(TreeSet.java:312) at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1299)
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1213)
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:939)
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:393)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:111)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1164)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1139)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1292)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:801)
at
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:763)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:614)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)2021-02-02T22:29:10.416::AccountId
::Partition ::Offset
::[com.xx.xx.xx.xx.xxx-client-StreamThread-1]::ERROR::o.a.k.s.processor.internals.StreamThread::stream-thread
[com.xx.xx.xx.xx.xxx-client-StreamThread-1] Encountered the following
exception during processing and the thread is going to shut down:
java.lang.IllegalStateException: Tried to lookup lag for unknown task 0_13 at
org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:318)
at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511)
at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) at
java.util.TreeMap.put(TreeMap.java:552) at
java.util.TreeSet.add(TreeSet.java:255) at
java.util.AbstractCollection.addAll(AbstractCollection.java:344) at
java.util.TreeSet.addAll(TreeSet.java:312) at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1299)
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1213)
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:939)
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:393)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:111)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1164)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1139)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1292)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:801)
at
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:763)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:614)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)