Modestas Vainius created KAFKA-8675:
---------------------------------------
Summary: "Main" consumers are not unsubsribed on
KafkaStreams.close()
Key: KAFKA-8675
URL: https://issues.apache.org/jira/browse/KAFKA-8675
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.2.1
Reporter: Modestas Vainius
Hi!
It seems that {{KafkaStreams.close()}} never unsubscribes "main" kafka
consumers. As far as I can tell,
{{org.apache.kafka.streams.processor.internals.TaskManager#shutdown}} does
unsubscribe only {{restoreConsumer}}. This results into Kafka Group coordinator
having to throw away consumer from the consumer group in a non-clean way.
{{KafkaStreams.close()}} does {{close()}} those consumers but it seems that is
not enough for clean exit.
Kafka Streams connects to Kafka:
{code:java}
kafka | [2019-07-17 08:02:35,707] INFO [GroupCoordinator 1]: Preparing
to rebalance group 1-streams-test in state PreparingRebalance with old
generation 0 (__consumer_offsets-44) (reason: Adding new member
1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db)
(kafka.coordinator.group.GroupCoordinator)
kafka | [2019-07-17 08:02:35,717] INFO [GroupCoordinator 1]: Stabilized
group 1-streams-test generation 1 (__consumer_offsets-44)
(kafka.coordinator.group.GroupCoordinator)
kafka | [2019-07-17 08:02:35,730] INFO [GroupCoordinator 1]: Assignment
received from leader for group 1-streams-test for generation 1
(kafka.coordinator.group.GroupCoordinator)
{code}
Processing finishes in 2 secs but after 10 seconds I see this in Kafka logs:
{code:java}
kafka | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Member
1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db
in group 1-streams-test has failed, removing it from the group
(kafka.coordinator.group.GroupCoordinator)
kafka | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Preparing
to rebalance group 1-streams-test in state PreparingRebalance with old
generation 1 (__consumer_offsets-44) (reason: removing member
1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db
on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
kafka | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Group
1-streams-test with generation 2 is now empty (__consumer_offsets-44)
(kafka.coordinator.group.GroupCoordinator)
{code}
Topology is kind of similar to [kafka testing
example|https://kafka.apache.org/22/documentation/streams/developer-guide/testing.html]
but I tried on real kafka instance (one node):
{code:java}
new Topology().with {
it.addSource("sourceProcessor", "input-topic")
it.addProcessor("aggregator", new
CustomMaxAggregatorSupplier(), "sourceProcessor")
it.addStateStore(
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("aggStore"),
Serdes.String(),
Serdes.Long()).withLoggingDisabled(), // need to
disable logging to allow aggregatorStore pre-populating
"aggregator")
it.addSink(
"sinkProcessor",
"result-topic",
"aggregator"
)
it
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)