Modestas Vainius created KAFKA-8675:
---------------------------------------

             Summary: "Main" consumers are not unsubsribed on 
KafkaStreams.close()
                 Key: KAFKA-8675
                 URL: https://issues.apache.org/jira/browse/KAFKA-8675
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.2.1
            Reporter: Modestas Vainius


Hi!

It seems that {{KafkaStreams.close()}} never unsubscribes "main" kafka 
consumers. As far as I can tell, 
{{org.apache.kafka.streams.processor.internals.TaskManager#shutdown}} does 
unsubscribe only {{restoreConsumer}}. This results into Kafka Group coordinator 
having to throw away consumer from the consumer group in a non-clean way. 
{{KafkaStreams.close()}} does {{close()}} those consumers but it seems that is 
not enough for clean exit.

Kafka Streams connects to Kafka:
{code:java}
kafka        | [2019-07-17 08:02:35,707] INFO [GroupCoordinator 1]: Preparing 
to rebalance group 1-streams-test in state PreparingRebalance with old 
generation 0 (__consumer_offsets-44) (reason: Adding new member 
1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db)
 (kafka.coordinator.group.GroupCoordinator)
kafka        | [2019-07-17 08:02:35,717] INFO [GroupCoordinator 1]: Stabilized 
group 1-streams-test generation 1 (__consumer_offsets-44) 
(kafka.coordinator.group.GroupCoordinator)
kafka        | [2019-07-17 08:02:35,730] INFO [GroupCoordinator 1]: Assignment 
received from leader for group 1-streams-test for generation 1 
(kafka.coordinator.group.GroupCoordinator)
{code}
Processing finishes in 2 secs but after 10 seconds I see this in Kafka logs:
{code:java}
kafka        | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Member 
1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db
 in group 1-streams-test has failed, removing it from the group 
(kafka.coordinator.group.GroupCoordinator)
kafka        | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Preparing 
to rebalance group 1-streams-test in state PreparingRebalance with old 
generation 1 (__consumer_offsets-44) (reason: removing member 
1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db
 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
kafka        | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Group 
1-streams-test with generation 2 is now empty (__consumer_offsets-44) 
(kafka.coordinator.group.GroupCoordinator)
{code}
Topology is kind of similar to [kafka testing 
example|https://kafka.apache.org/22/documentation/streams/developer-guide/testing.html]
 but I tried on real kafka instance (one node):
{code:java}
            new Topology().with {
                it.addSource("sourceProcessor", "input-topic")
                it.addProcessor("aggregator", new 
CustomMaxAggregatorSupplier(), "sourceProcessor")
                it.addStateStore(
                    Stores.keyValueStoreBuilder(
                        Stores.inMemoryKeyValueStore("aggStore"),
                        Serdes.String(),
                        Serdes.Long()).withLoggingDisabled(), // need to 
disable logging to allow aggregatorStore pre-populating
                    "aggregator")
                it.addSink(
                    "sinkProcessor",
                    "result-topic",
                    "aggregator"
                )
                it
            }
{code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to