PanTheMan opened a new pull request #1287: SAMZA-2317: ProcessJob does not call 
CoordinatorStreamStore.close()
URL: https://github.com/apache/samza/pull/1287
 
 
   Symptom: Users when deploying a job in dev will see the following Error 
message in their logs repeatedly:
   ```
   2019-09-11 14:39:37.193 [Finalizer] [] LifecycleAwareConsumer [ERROR] kafka 
consumer allocated and not closed (see attached exception for point of 
allocation)
   
   java.lang.Throwable
   
           at 
com.linkedin.kafka.linkedinclients.decorators.LifecycleAwareConsumer.<init>(LifecycleAwareConsumer.java:54)
   
           at 
com.linkedin.kafka.clients.factory.RawKafkaConsumerFactoryFactory$RawKafkaConsumerBuilder.apply(RawKafkaConsumerFactoryFactory.java:72)
   
           at 
com.linkedin.kafka.clients.factory.RawKafkaConsumerFactoryFactory$RawKafkaConsumerBuilder.apply(RawKafkaConsumerFactoryFactory.java:44)
   
           at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemFactory.getKafkaConsumer(SamzaRawLiKafkaSystemFactory.java:135)
   
           at 
com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemFactory.getAdmin(SamzaRawLiKafkaSystemFactory.java:104)
   
           at 
org.apache.samza.config.SystemConfig.lambda$getSystemAdmins$0(SystemConfig.java:97)
   
           at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
   
           at 
java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
   
           at 
java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)
   
           at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
   
           at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
   
           at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
   
           at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   
           at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
   
           at 
org.apache.samza.config.SystemConfig.getSystemAdmins(SystemConfig.java:95)
   
           at org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38)
   
           at org.apache.samza.job.JobRunner.run(JobRunner.scala:82)
   
           at 
org.apache.samza.runtime.RemoteApplicationRunner.lambda$run$0(RemoteApplicationRunner.java:76)
   
           at java.util.ArrayList.forEach(ArrayList.java:1257)
   
           at 
org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:73)
   
           at 
org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:54)
   
           at 
org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53)
   
   2019-09-11 14:39:37.193 
[kafka_admin_consumer-cfg_demo_yc_job-i001_yuhcheng_mn1_linkedin_biz-auditor] 
[] KafkaProducer [INFO] [Producer 
clientId=kafka_admin_consumer-cfg_demo_yc_job-i001_yuhcheng_mn1_linkedin_biz-auditor]
 Proceeding to force close the producer since pending requests could not be 
completed within timeout 0 ms.
   
   2019-09-11 14:39:37.194 [Finalizer] [] LiKafkaConsumerImpl [INFO] Shutting 
down in PT1S...
   
   2019-09-11 14:39:37.194 
[kafka_admin_consumer-cfg_demo_yc_job-i001_yuhcheng_mn1_linkedin_biz-auditor] 
[] AbstractAuditor [ERROR] Auditor encounter exception.
   
   org.apache.kafka.common.errors.InterruptException: 
java.lang.InterruptedException
   
           at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1168)
   
           at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1126)
   
           at 
com.linkedin.kafka.linkedinclients.auditing.TmeAuditor.onClosed(TmeAuditor.java:366)
   
           at 
com.linkedin.kafka.clients.auditing.abstractimpl.AbstractAuditor.run(AbstractAuditor.java:157)
   
   Caused by: java.lang.InterruptedException
   
           at java.lang.Object.wait(Native Method)
   
           at java.lang.Thread.join(Thread.java:1252)
   
           at java.lang.Thread.join(Thread.java:1326)
   
           at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1166)
   
           ... 3 more
   
   ```
   This error message will appear multiple times in their job and doesn't 
actually affect the job's performance or correctness. This leads to users 
falsely believing that any job failures is because of this message.
   
   Cause: 
   All Kafka consumers will have a finalize method in them. This means that 
when the garbage collector determines that there's no more reference to the 
consumer, it will try to close the consumer. So for ProcessJob, in the 
ProcessJobFactory, a CoordinatorStreamStore is initialized. It is then wrapped 
around a NameSpaceAwareCoordinatorStreamStore. Later on when the ProcessJob is 
done, a `close()` is called on the NameSpaceAwareCoordinatorStreamStore, 
however this doesn't close the original CoordinatorStreamStore. So, there are 
no more references to the CoordinatorStreamStore, leading to the finalize 
method reporting the above error.
   
   Changes: ProcessJob will now have an additional parameter which will be the 
CoordinatorStreamStore. Then a close is called on the store when the job is 
done running. That way there is still a reference to the store after the 
ProcessJob starts. Also updated the TestProcessJob file to test that the store 
is closed properly. 
   
   Tests: To reproduce this error, deploy any job running the latest version of 
Samza and look in the .out log file.
   --
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to