[ https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15302542#comment-15302542 ]
Greg Fodor commented on KAFKA-3758: ----------------------------------- Also, the log is truncated at the top to the point where we shut the broker off. If there's additional useful information in the log before that you think we could share, happy to attach. > KStream job fails to recover after Kafka broker stopped > ------------------------------------------------------- > > Key: KAFKA-3758 > URL: https://issues.apache.org/jira/browse/KAFKA-3758 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.0.0 > Reporter: Greg Fodor > Assignee: Guozhang Wang > Attachments: muon.log.1.gz > > > We've been doing some testing of a fairly complex KStreams job and under load > it seems the job fails to rebalance + recover if we shut down one of the > kafka brokers. The test we were running had a 3-node kafka cluster where each > topic had at least a replication factor of 2, and we terminated one of the > nodes. > Attached is the full log, the root exception seems to be contention on the > lock on the state directory. The job continues to try to recover but throws > errors relating to locks over and over. Restarting the job itself resolves > the problem. > 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while > creating the state manager > 1703 at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71) > 1704 at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86) > 1705 at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > 1706 at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > 1707 at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > 1708 at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > 1709 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > 1710 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > 1711 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > 1712 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1713 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1714 at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > 1715 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1716 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1717 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > 1718 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > 1719 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > 1720 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > 1721 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > 1722 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > 1723 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > 1724 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > 1725 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > 1726 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > 1727 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > 1728 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > 1729 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > 1730 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > 1731 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > 1732 at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > 1733 at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > 1734 at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > 1735 at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > 1736 Caused by: java.io.IOException: Failed to lock the state directory: > /muon/state/job-stream_photon_messages-1/2_82 > 1737 at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95) > 1738 at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69) > 1739 ... 32 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)