[ https://issues.apache.org/jira/browse/KAFKA-4043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15468278#comment-15468278 ]
Guozhang Wang commented on KAFKA-4043: -------------------------------------- Hi [~gfodor], when there is a rebalance happening and some tasks (with their embedded topology) are shutting down since the partitions are reassigned to other instances, the library will close the topology by calling `Processor.close()` in the topology DAG order, as a user you can customize the `close()` function to shutdown any created background threads / etc. Does this resolve this issue? > User-defined handler for topology restart > ----------------------------------------- > > Key: KAFKA-4043 > URL: https://issues.apache.org/jira/browse/KAFKA-4043 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.0.1 > Reporter: Greg Fodor > Assignee: Guozhang Wang > > Since Kafka Streams is just a library, there's a lot of cool stuff we've been > able to do that would be trickier if it were part of a larger > cluster-oriented job execution system that had assumptions about the > semantics of a job. One of the jobs we have uses Kafka Streams to do top > level data flow, and then one of our processors actually will kick off > background threads to do work based upon the data flow state. Happy to fill > in more details of our use-case, but fundamentally the model is that we have > a Kafka Streams data flow that is reading state from upstream, and that state > dictates that work needs to be done, which results in a dedicated work thread > to be spawned by our job. > This works great, but we're running into an issue when there is partition > reassignment, since we have no way to detect this and cleanly shut down these > threads. In our case, we'd like to shut down the background worker threads if > there is a partition rebalance or if the job raises an exception and attempts > to restart. In practice what is happening is we are getting duplicate threads > for the same work on a partition rebalance. > Implementation-wise, this seems like some type of event handler that can be > attached to the topology at build time that can will be called when the data > flow needs to rebalance or rebuild its task threads in general (ideally > passing as much information about the reason along.) I could imagine this > being factored similarly to the KafkaStreams#setUncaughtExceptionHandler. -- This message was sent by Atlassian JIRA (v6.3.4#6332)