[ https://issues.apache.org/jira/browse/SAMZA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shanthoosh Venkataraman updated SAMZA-1647: ------------------------------------------- Description: *Problem:* There's a minor race condition in container shutdown triggered from the onJobModelExpired event handler. This race condition leads to an NPE and eventually kills the StreamProcessor. *Reason:* Following is the JobModelExpired event handler logic executed from the DebounceThread: 1. Invoke container shutdown. 2. Wait for configured task.shutdown.ms through a Latch(L). 3. Log the containerId after the configured wait time. Following is the onContainerFailed event handler logic executed from the ContainerThread: 1. countdown the Latch(L). 2. Set the container field to null. If the Step 2. of containerFailed handler is executed before the step 3 of JobModelExpired handler, container value will be set null before we log it in JobModelExpired handler. This results in an NPE. *Relevant stacktrace:* {code:java} [p-0000000000-container-thread-0] ERROR org.apache.samza.processor.StreamProcessor - Container failed. Stopping the processor. org.apache.samza.system.SystemProducerException: Flush failed. One or more batches of messages were not sent! at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:159) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130) at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) at org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39) at org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:130) at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64) at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64) at org.apache.samza.task.TaskInstanceCollector.flush(TaskInstanceCollector.scala:69) at org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:210) at org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024) at org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.SamzaContainer.shutdownTask(SamzaContainer.scala:1024) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:731) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.samza.system.SystemProducerException: Failed to send message for Source: TaskName-SystemStreamPartition [TestSystemName, test-input-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895, 0] on Syste m:TestSystemName Topic:test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895 at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:109) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:600) at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:272) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:223) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) ... 1 more Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895-0: 30003 ms has passed since batch creation plus linger time [p-0000000000-container-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down JobCoordinator from StreamProcessor [debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - ShutdownComplete=true [debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down container done for pid=0000000000; complete =true [debounce-thread-0] ERROR org.apache.samza.zk.ZkJobCoordinator - Encountered errors during job coordinator stop. java.lang.NullPointerException at org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:235) at org.apache.samza.zk.ZkJobCoordinator.stop(ZkJobCoordinator.java:163) at org.apache.samza.zk.ZkJobCoordinator$ZkSessionStateChangedListener.lambda$handleStateChanged$0(ZkJobCoordinator.java:449) at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:153) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} was: *Problem:* There's a minor race condition in container shutdown triggered from the onJobModelExpired event handler. This race condition leads to an NPE and eventually kills the StreamProcessor. *Reason:* Following is the JobModelExpired event handler logic executed from the DebounceThread: 1. Invoke container shutdown. 2. Wait for configured task.shutdown.ms through countDownLatch. 3. Log the containerId after the configured wait time. Following is the onContainerFailed event handler logic executed from the ContainerThread: 1. pull down the countdownLatch. 2. Set the container field to null. If the Step 2. of containerFailed handler is executed before the step 3 of JobModelExpired handler, container value will be set null before we log it in JobModelExpired handler. This results in an NPE. *Relevant stacktrace:* {code:java} [p-0000000000-container-thread-0] ERROR org.apache.samza.processor.StreamProcessor - Container failed. Stopping the processor. org.apache.samza.system.SystemProducerException: Flush failed. One or more batches of messages were not sent! at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:159) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130) at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) at org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39) at org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:130) at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64) at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64) at org.apache.samza.task.TaskInstanceCollector.flush(TaskInstanceCollector.scala:69) at org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:210) at org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024) at org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.SamzaContainer.shutdownTask(SamzaContainer.scala:1024) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:731) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.samza.system.SystemProducerException: Failed to send message for Source: TaskName-SystemStreamPartition [TestSystemName, test-input-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895, 0] on Syste m:TestSystemName Topic:test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895 at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:109) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:600) at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:272) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:223) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) ... 1 more Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895-0: 30003 ms has passed since batch creation plus linger time [p-0000000000-container-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down JobCoordinator from StreamProcessor [debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - ShutdownComplete=true [debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down container done for pid=0000000000; complete =true [debounce-thread-0] ERROR org.apache.samza.zk.ZkJobCoordinator - Encountered errors during job coordinator stop. java.lang.NullPointerException at org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:235) at org.apache.samza.zk.ZkJobCoordinator.stop(ZkJobCoordinator.java:163) at org.apache.samza.zk.ZkJobCoordinator$ZkSessionStateChangedListener.lambda$handleStateChanged$0(ZkJobCoordinator.java:449) at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:153) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} > Fix NPE in JobModelExpired event handler. > ----------------------------------------- > > Key: SAMZA-1647 > URL: https://issues.apache.org/jira/browse/SAMZA-1647 > Project: Samza > Issue Type: Bug > Reporter: Shanthoosh Venkataraman > Assignee: Shanthoosh Venkataraman > Priority: Major > > *Problem:* > There's a minor race condition in container shutdown triggered from the > onJobModelExpired event handler. This race condition leads to an NPE and > eventually kills the StreamProcessor. > *Reason:* > Following is the JobModelExpired event handler logic executed from the > DebounceThread: > 1. Invoke container shutdown. > 2. Wait for configured task.shutdown.ms through a Latch(L). > 3. Log the containerId after the configured wait time. > Following is the onContainerFailed event handler logic executed from the > ContainerThread: > 1. countdown the Latch(L). > 2. Set the container field to null. > If the Step 2. of containerFailed handler is executed before the step 3 of > JobModelExpired handler, container value will be set null before we log it in > JobModelExpired handler. This results in an NPE. > *Relevant stacktrace:* > {code:java} > [p-0000000000-container-thread-0] ERROR > org.apache.samza.processor.StreamProcessor - Container failed. Stopping the > processor. > org.apache.samza.system.SystemProducerException: Flush failed. One or more > batches of messages were not sent! > at > org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:159) > at > org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130) > at > org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:130) > at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > at > org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39) > at > org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:130) > at > org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64) > at > org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > at org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64) > at > org.apache.samza.task.TaskInstanceCollector.flush(TaskInstanceCollector.scala:69) > at org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:210) > at > org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024) > at > org.apache.samza.container.SamzaContainer$$anonfun$shutdownTask$9.apply(SamzaContainer.scala:1024) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > at > org.apache.samza.container.SamzaContainer.shutdownTask(SamzaContainer.scala:1024) > at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:731) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.samza.system.SystemProducerException: Failed to send > message for Source: TaskName-SystemStreamPartition [TestSystemName, > test-input-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895, 0] on > Syste m:TestSystemName > Topic:test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895 > at > org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:109) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:600) > at > org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:272) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:223) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) > ... 1 more > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 > record(s) for > test-output-single-partition-topic-21194330-d462-4d98-8915-fbbac765d895-0: > 30003 ms has passed since batch creation plus linger time > [p-0000000000-container-thread-0] INFO > org.apache.samza.processor.StreamProcessor - Shutting down JobCoordinator > from StreamProcessor > [debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - > ShutdownComplete=true > [debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - > Shutting down container done for pid=0000000000; complete =true > [debounce-thread-0] ERROR org.apache.samza.zk.ZkJobCoordinator - Encountered > errors during job coordinator stop. > java.lang.NullPointerException > at > org.apache.samza.processor.StreamProcessor$1.onJobModelExpired(StreamProcessor.java:235) > at org.apache.samza.zk.ZkJobCoordinator.stop(ZkJobCoordinator.java:163) > at > org.apache.samza.zk.ZkJobCoordinator$ZkSessionStateChangedListener.lambda$handleStateChanged$0(ZkJobCoordinator.java:449) > at > org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:153) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)