Jacob Gur created KAFKA-4799:
--------------------------------
Summary: session timeout during event processing shuts down stream
Key: KAFKA-4799
URL: https://issues.apache.org/jira/browse/KAFKA-4799
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 0.10.1.1
Environment: kafka streams client running on os x, with docker machine
running broker
Reporter: Jacob Gur
Priority: Critical
I have a simple stream application like this:
{code:title=Part of my class|borderStyle=solid}
private <T> IConsumerSubscription buildSubscriptionStream(
Class<T> clazz, Consumer<T> consumer, String group,
Function<KStreamBuilder, KStream<String, String>>
topicStreamFunc)
{
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> stream = topicStreamFunc.apply(builder);
stream.foreach((k, v) -> {
try {
T value =
_jsonObjectMapper.mapFromJsonString(v, clazz);
consumer.accept(value);
Logger.trace("Consumed message {}", value);
} catch (Throwable th) {
Logger.warn("Error while consuming message",
th);
}
});
final KafkaStreams streams = new KafkaStreams(builder,
constructProperties(group));
streams.start();
return streams::close;
}
{code}
There is just one client running this application stream.
If I run the client in a debugger with a breakpoint on the event processor
(i.e., inside the foreach lambda) with debugger suspending all threads for
perhaps more than 10 seconds, then when I resume the application:
Actual behavior - the stream shuts down
Expected behavior - the stream should recover, perhaps temporarily removed from
partition but then re-added and recovered.
It looks like what happens is this:
1) The kafka client session times out.
2) The partition is revoked
3) The streams library has a rebalance listener that tries to commit offsets,
but that commit fails due to a rebalance exception.
4) Stream shuts down.
Steps 3 and 4 occur in StreamThread's rebalance listener.
It seems that it should be more resilient and recover just like a regular
KafkaConsumer would. Its partition would be revoked, and then it would get it
back again and resume processing at the last offset.
Is current behavior expected and I'm not understanding the intention? Or is
this a bug?
Thanks!
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)