Modestas Vainius created KAFKA-8675: ---------------------------------------
Summary: "Main" consumers are not unsubsribed on KafkaStreams.close() Key: KAFKA-8675 URL: https://issues.apache.org/jira/browse/KAFKA-8675 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.2.1 Reporter: Modestas Vainius Hi! It seems that {{KafkaStreams.close()}} never unsubscribes "main" kafka consumers. As far as I can tell, {{org.apache.kafka.streams.processor.internals.TaskManager#shutdown}} does unsubscribe only {{restoreConsumer}}. This results into Kafka Group coordinator having to throw away consumer from the consumer group in a non-clean way. {{KafkaStreams.close()}} does {{close()}} those consumers but it seems that is not enough for clean exit. Kafka Streams connects to Kafka: {code:java} kafka | [2019-07-17 08:02:35,707] INFO [GroupCoordinator 1]: Preparing to rebalance group 1-streams-test in state PreparingRebalance with old generation 0 (__consumer_offsets-44) (reason: Adding new member 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db) (kafka.coordinator.group.GroupCoordinator) kafka | [2019-07-17 08:02:35,717] INFO [GroupCoordinator 1]: Stabilized group 1-streams-test generation 1 (__consumer_offsets-44) (kafka.coordinator.group.GroupCoordinator) kafka | [2019-07-17 08:02:35,730] INFO [GroupCoordinator 1]: Assignment received from leader for group 1-streams-test for generation 1 (kafka.coordinator.group.GroupCoordinator) {code} Processing finishes in 2 secs but after 10 seconds I see this in Kafka logs: {code:java} kafka | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Member 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db in group 1-streams-test has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) kafka | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Preparing to rebalance group 1-streams-test in state PreparingRebalance with old generation 1 (__consumer_offsets-44) (reason: removing member 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator) kafka | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Group 1-streams-test with generation 2 is now empty (__consumer_offsets-44) (kafka.coordinator.group.GroupCoordinator) {code} Topology is kind of similar to [kafka testing example|https://kafka.apache.org/22/documentation/streams/developer-guide/testing.html] but I tried on real kafka instance (one node): {code:java} new Topology().with { it.addSource("sourceProcessor", "input-topic") it.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor") it.addStateStore( Stores.keyValueStoreBuilder( Stores.inMemoryKeyValueStore("aggStore"), Serdes.String(), Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow aggregatorStore pre-populating "aggregator") it.addSink( "sinkProcessor", "result-topic", "aggregator" ) it } {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)