[
https://issues.apache.org/jira/browse/KAFKA-4799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang resolved KAFKA-4799.
----------------------------------
Resolution: Fixed
Fix Version/s: 0.11.0.1
> session timeout during event processing shuts down stream
> ---------------------------------------------------------
>
> Key: KAFKA-4799
> URL: https://issues.apache.org/jira/browse/KAFKA-4799
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.1.1
> Environment: kafka streams client running on os x, with docker
> machine running broker
> Reporter: Jacob Gur
> Priority: Critical
> Fix For: 0.11.0.1
>
>
> I have a simple stream application like this:
> {code:title=Part of my class|borderStyle=solid}
> private <T> IConsumerSubscription buildSubscriptionStream(
> Class<T> clazz, Consumer<T> consumer, String group,
> Function<KStreamBuilder, KStream<String, String>>
> topicStreamFunc)
> {
> KStreamBuilder builder = new KStreamBuilder();
> KStream<String, String> stream = topicStreamFunc.apply(builder);
> stream.foreach((k, v) -> {
> try {
> T value =
> _jsonObjectMapper.mapFromJsonString(v, clazz);
> consumer.accept(value);
> Logger.trace("Consumed message {}", value);
> } catch (Throwable th) {
> Logger.warn("Error while consuming message",
> th);
> }
> });
> final KafkaStreams streams = new KafkaStreams(builder,
> constructProperties(group));
> streams.start();
> return streams::close;
> }
> {code}
> There is just one client running this application stream.
> If I run the client in a debugger with a breakpoint on the event processor
> (i.e., inside the foreach lambda) with debugger suspending all threads for
> perhaps more than 10 seconds, then when I resume the application:
> Actual behavior - the stream shuts down
> Expected behavior - the stream should recover, perhaps temporarily removed
> from partition but then re-added and recovered.
> It looks like what happens is this:
> 1) The kafka client session times out.
> 2) The partition is revoked
> 3) The streams library has a rebalance listener that tries to commit offsets,
> but that commit fails due to a rebalance exception.
> 4) Stream shuts down.
> Steps 3 and 4 occur in StreamThread's rebalance listener.
> It seems that it should be more resilient and recover just like a regular
> KafkaConsumer would. Its partition would be revoked, and then it would get it
> back again and resume processing at the last offset.
> Is current behavior expected and I'm not understanding the intention? Or is
> this a bug?
> Thanks!
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)