[ 
https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508661#comment-16508661
 ] 

Eugen Feller edited comment on KAFKA-6977 at 6/11/18 8:13 PM:
--------------------------------------------------------------

Hi [~mjsax]. Interesting. Thanks a lot. I think the topic itself is likely 
consumed by multiple downstream consumers. However, only this job actually 
consumes it for the purposes of writing out to a database. Just tested 
consuming the topic with plain KafkaConsumer and it works. I also went over all 
the stack traces seen in that context and found the following:
{code:java}
KafkaException: Record for partition our_stats_topic_0-17 at offset 217641273 
is invalid, cause: Record is corrupt (stored crc = 3302026163, computed crc = 
3432237873)
1
at maybeEnsureValid 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1002)
2
at nextFetchedRecord 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1059)
3
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1090)
4
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
5
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
6
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
7
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
8
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
9
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
10
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
11
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
12
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
KafkaException: Received exception when fetching the next record from 
our_stats_topic_0-17. If needed, please seek past the record to continue 
consumption.
1
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1110)
2
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
3
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
4
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
5
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
6
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
7
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
8
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
9
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
10
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
{code}
In this particular instance I think the following stack trace was seen at the 
same time with error code 2:
{code:java}
CorruptRecordException: Record size is less than the minimum record overhead 
(14)
KafkaException: Received exception when fetching the next record from 
our_stats_topic_0-16. If needed, please seek past the record to continue 
consumption.
1
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1076)
2
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
3
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
4
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
5
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
6
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
7
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
8
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
9
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
10
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
{code}
I am wondering what could have lead to error code 2 and maybe the above 
ConcurrentRecordException and what would be the best way to mitigate them? Do 
the records get corrupt on the wire somehow?

 


was (Author: efeller):
Hi [~mjsax]. Interesting. Thanks a lot. I think the topic itself is likely 
consumed by multiple downstream consumers. However, only this job actually 
consumes it for the purposes of writing out to a database. Just tested 
consuming the topic with plain KafkaConsumer and it works. I also went over all 
the stack traces seen in that context and found the following:
{code:java}
KafkaException: Record for partition our_stats_topic_0-17 at offset 217641273 
is invalid, cause: Record is corrupt (stored crc = 3302026163, computed crc = 
3432237873)
1
at maybeEnsureValid 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1002)
2
at nextFetchedRecord 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1059)
3
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1090)
4
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
5
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
6
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
7
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
8
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
9
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
10
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
11
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
12
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
KafkaException: Received exception when fetching the next record from 
our_stats_topic_0-17. If needed, please seek past the record to continue 
consumption.
1
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1110)
2
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
3
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
4
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
5
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
6
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
7
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
8
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
9
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
10
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
{code}
In this particular instance I think the following stack trace was seen at the 
same time with error code 2:

 
{code:java}
CorruptRecordException: Record size is less than the minimum record overhead 
(14)
KafkaException: Received exception when fetching the next record from 
our_stats_topic_0-16. If needed, please seek past the record to continue 
consumption.
1
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1076)
2
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
3
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
4
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
5
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
6
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
7
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
8
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
9
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
10
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
{code}
I am wondering what could have lead to error code 2 and maybe the above 
ConcurrentRecordException and what would be the best way to mitigate them? Do 
the records get corrupt on the wire somehow?

 

>  Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 
> while fetching data
> ---------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6977
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6977
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.1
>            Reporter: Eugen Feller
>            Priority: Blocker
>              Labels: streams
>
> We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and 
> constantly run into the following exception: 
> {code:java}
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> partition assignment took 40 ms.
> current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, 
> 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, 
> 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31]
> current standby tasks: []
> previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 
> 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28]
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> State transition from PARTITIONS_ASSIGNED to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.KafkaStreams - stream-client 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State 
> transition from REBALANCING to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> ERROR org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> Encountered the following error during processing:
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
> java.lang.IllegalStateException: Unexpected error code 2 while fetching data
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> Shutting down
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> State transition from RUNNING to PENDING_SHUTDOWN.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka 
> producer with timeoutMillis = 9223372036854775807 ms.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> Stream thread shutdown complete
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> State transition from PENDING_SHUTDOWN to DEAD.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.KafkaStreams - stream-client 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4] State 
> transition from RUNNING to ERROR.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> WARN org.apache.kafka.streams.KafkaStreams - stream-client 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4] All stream 
> threads have died. The Kafka Streams instance will be in an error state and 
> should be closed.
> 6062195 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> FATAL com.zenreach.data.flows.visitstatsmongoexporter.MongoVisitStatsWriter$ 
> - Exiting main on uncaught exception
> java.lang.IllegalStateException: Unexpected error code 2 while fetching data
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> {code}
> Looks like the error is thrown here 
> [https://github.com/apache/kafka/blob/0.11.0.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L886]
> Before giving that exception, our Kafka streams job keeps rebalancing and 
> rebalancing. This is a simple job that reads data of Kafka and writes it back 
> to MongoDB. It reads from a topic of 32 partitions and runs on AWS ECS with 
> 32 instances (each with one stream thread). Any idea what could be going 
> wrong? 
> Thanks a lot.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to