Hi Martin I'm sorry for the delayed response. I'm not sure if this is rather a YARN problem than a Samza issue. From the log message it looks like Samza does correctly shutdown, but the container is still in "RUNNING" state. Here's the log output:
11:07:08,552 samza.container.SamzaContainer INFO : Entering run loop. 11:07:08,791 system.kafka.KafkaSystemProducer INFO : Creating a new producer for system kafka. 12:26:08,832 samza.container.RunLoop INFO : Shutdown has now been requested by tasks: Set(Partition [partition=0]) 12:26:08,833 samza.container.RunLoop INFO : Shutdown requested. 12:26:08,833 samza.container.SamzaContainer INFO : Shutting down. 12:26:08,834 samza.container.SamzaContainer INFO : Shutting down consumer multiplexer. 12:26:08,835 system.kafka.BrokerProxy INFO : Shutting down BrokerProxy for minion08.ifi.uzh.ch:9092 12:26:08,901 system.kafka.BrokerProxy INFO : Got closed by interrupt exception in broker proxy thread. 12:26:08,902 system.kafka.BrokerProxy INFO : Shutting down due to interrupt. 12:26:08,903 samza.container.SamzaContainer INFO : Shutting down producer multiplexer. 12:26:08,909 samza.container.SamzaContainer INFO : Shutting down task instance stream tasks. 12:26:08,911 samza.container.SamzaContainer INFO : Shutting down task instance stores. 12:26:08,920 samza.container.SamzaContainer INFO : Shutting down offset manager. 12:26:08,921 samza.container.SamzaContainer INFO : Shutting down metrics reporters. 12:26:08,922 metrics.reporter.MetricsSnapshotReporter INFO : Stopping producer. 12:26:08,923 metrics.reporter.MetricsSnapshotReporter INFO : Stopping reporter timer. 12:26:08,925 samza.container.SamzaContainer INFO : Shutting down JVM metrics. 12:26:08,925 samza.container.SamzaContainer INFO : Shutdown complete. Any idea on how to further debug this issue? Cheers Nicolas On Mon, Jun 16, 2014 at 7:27 PM, Martin Kleppmann < [email protected]> wrote: > Hi Nicolas, > > Thanks for trying the new feature. Yes, if you're only running one task > instance in each container, then shutdown(CURRENT_TASK) should shut down > the container (i.e. behave the same as shutdown(ALL_TASKS_IN_CONTAINER)). > > Do you see any messages like "Shutdown has now been requested by tasks: > [...]" (at info level) in your container logs? They should indicate the > partitions of the current container which have requested shutdown. If you > compare that to the list of partitions assigned to the current container, > the container should shut down when those sets of partitions are the same. > If it doesn't do that, it's a bug. > > Regarding receiving the container name, try > System.getenv("SAMZA_CONTAINER_NAME"). > > Martin > > On 15 Jun 2014, at 12:57, Nicolas Bär <[email protected]> wrote: > > > Hi All > > > > I tried the new shutdown feature from SAMZA-253. > > > > It works well in case one container is started with multiple threads on a > > single node. But > > `taskCoordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);` > > seems not to work running a Samza Stream Task with 20 containers on 4 > > machines using YARN. Each container handles one partition only, therefore > > I'm consuming from a Kafka topic with 20 partitions. > > > > As far as I understood, this would mean on every call of > > `taskCoordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);` > the > > corresponding container would finish. This is not the case. In fact all > > containers are still running after calling this command in every task. > > > > In case it matters: the command is called in the window function of the > > WindowableTask. > > > > Yarn: 2.2.0 > > Samza: 0.7.0 branch (from last friday: > > 052de224a3256cc652032de5e804338b4dc92fe0) > > > > Any hints on how to further debug this? > > > > Second question: Is there any chance to receive the container name / > number > > within the task instance? It would make debugging a lot easier :) > > > > > > Cheers > > Nicolas > >
