[ https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508661#comment-16508661 ]
Eugen Feller edited comment on KAFKA-6977 at 6/11/18 8:13 PM: -------------------------------------------------------------- Hi [~mjsax]. Interesting. Thanks a lot. I think the topic itself is likely consumed by multiple downstream consumers. However, only this job actually consumes it for the purposes of writing out to a database. Just tested consuming the topic with plain KafkaConsumer and it works. I also went over all the stack traces seen in that context and found the following: {code:java} KafkaException: Record for partition our_stats_topic_0-17 at offset 217641273 is invalid, cause: Record is corrupt (stored crc = 3302026163, computed crc = 3432237873) 1 at maybeEnsureValid (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1002) 2 at nextFetchedRecord (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1059) 3 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1090) 4 at access$1200 (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944) 5 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567) 6 at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528) 7 at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086) 8 at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043) 9 at pollRequests (org.apache.kafka.streams.processor.internals.StreamThread.java:536) 10 at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490) 11 at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480) 12 at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457) KafkaException: Received exception when fetching the next record from our_stats_topic_0-17. If needed, please seek past the record to continue consumption. 1 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1110) 2 at access$1200 (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944) 3 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567) 4 at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528) 5 at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086) 6 at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043) 7 at pollRequests (org.apache.kafka.streams.processor.internals.StreamThread.java:536) 8 at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490) 9 at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480) 10 at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457) {code} In this particular instance I think the following stack trace was seen at the same time with error code 2: {code:java} CorruptRecordException: Record size is less than the minimum record overhead (14) KafkaException: Received exception when fetching the next record from our_stats_topic_0-16. If needed, please seek past the record to continue consumption. 1 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1076) 2 at access$1200 (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944) 3 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567) 4 at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528) 5 at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086) 6 at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043) 7 at pollRequests (org.apache.kafka.streams.processor.internals.StreamThread.java:536) 8 at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490) 9 at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480) 10 at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457) {code} I am wondering what could have lead to error code 2 and maybe the above ConcurrentRecordException and what would be the best way to mitigate them? Do the records get corrupt on the wire somehow? was (Author: efeller): Hi [~mjsax]. Interesting. Thanks a lot. I think the topic itself is likely consumed by multiple downstream consumers. However, only this job actually consumes it for the purposes of writing out to a database. Just tested consuming the topic with plain KafkaConsumer and it works. I also went over all the stack traces seen in that context and found the following: {code:java} KafkaException: Record for partition our_stats_topic_0-17 at offset 217641273 is invalid, cause: Record is corrupt (stored crc = 3302026163, computed crc = 3432237873) 1 at maybeEnsureValid (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1002) 2 at nextFetchedRecord (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1059) 3 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1090) 4 at access$1200 (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944) 5 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567) 6 at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528) 7 at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086) 8 at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043) 9 at pollRequests (org.apache.kafka.streams.processor.internals.StreamThread.java:536) 10 at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490) 11 at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480) 12 at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457) KafkaException: Received exception when fetching the next record from our_stats_topic_0-17. If needed, please seek past the record to continue consumption. 1 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1110) 2 at access$1200 (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944) 3 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567) 4 at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528) 5 at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086) 6 at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043) 7 at pollRequests (org.apache.kafka.streams.processor.internals.StreamThread.java:536) 8 at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490) 9 at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480) 10 at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457) {code} In this particular instance I think the following stack trace was seen at the same time with error code 2: {code:java} CorruptRecordException: Record size is less than the minimum record overhead (14) KafkaException: Received exception when fetching the next record from our_stats_topic_0-16. If needed, please seek past the record to continue consumption. 1 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1076) 2 at access$1200 (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944) 3 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567) 4 at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528) 5 at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086) 6 at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043) 7 at pollRequests (org.apache.kafka.streams.processor.internals.StreamThread.java:536) 8 at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490) 9 at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480) 10 at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457) {code} I am wondering what could have lead to error code 2 and maybe the above ConcurrentRecordException and what would be the best way to mitigate them? Do the records get corrupt on the wire somehow? > Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 > while fetching data > --------------------------------------------------------------------------------------------- > > Key: KAFKA-6977 > URL: https://issues.apache.org/jira/browse/KAFKA-6977 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.11.0.1 > Reporter: Eugen Feller > Priority: Blocker > Labels: streams > > We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and > constantly run into the following exception: > {code:java} > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > partition assignment took 40 ms. > current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, > 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, > 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31] > current standby tasks: [] > previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, > 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28] > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > State transition from PARTITIONS_ASSIGNED to RUNNING. > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.KafkaStreams - stream-client > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State > transition from REBALANCING to RUNNING. > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > ERROR org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > Encountered the following error during processing: > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) > java.lang.IllegalStateException: Unexpected error code 2 while fetching data > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > Shutting down > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > State transition from RUNNING to PENDING_SHUTDOWN. > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka > producer with timeoutMillis = 9223372036854775807 ms. > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > Stream thread shutdown complete > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > State transition from PENDING_SHUTDOWN to DEAD. > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.KafkaStreams - stream-client > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4] State > transition from RUNNING to ERROR. > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > WARN org.apache.kafka.streams.KafkaStreams - stream-client > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4] All stream > threads have died. The Kafka Streams instance will be in an error state and > should be closed. > 6062195 > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > FATAL com.zenreach.data.flows.visitstatsmongoexporter.MongoVisitStatsWriter$ > - Exiting main on uncaught exception > java.lang.IllegalStateException: Unexpected error code 2 while fetching data > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > {code} > Looks like the error is thrown here > [https://github.com/apache/kafka/blob/0.11.0.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L886] > Before giving that exception, our Kafka streams job keeps rebalancing and > rebalancing. This is a simple job that reads data of Kafka and writes it back > to MongoDB. It reads from a topic of 32 partitions and runs on AWS ECS with > 32 instances (each with one stream thread). Any idea what could be going > wrong? > Thanks a lot. -- This message was sent by Atlassian JIRA (v7.6.3#76005)