[ 
https://issues.apache.org/jira/browse/ATLAS-1944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nixon Rodrigues updated ATLAS-1944:
-----------------------------------
    Attachment: ATLAS-1944.1.patch

Background: Stoping NotificationHookConsumer was throwing 
ConcurrentModificationException exception while shutting down the consumer 
thread. The stop method of thread was calling the kafkacosumer.stop method 
while in run method of HookConsumer which is busy in consuming the kafka 
message by polling kafka server. This simultaneous act of stopping and polling 
causes ConcurrentModificationException.

Fix:- 

Added try finally block in run method which wraps the while loop and when loop 
ends the finally blocks calls the consumer close method which ensures that 
close is called after consuming is ended.
Replaced the ShutdownThread interface which was delaying the shutdown process 
of consumer with Runnable interface used earlier.


Reference : - 
https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Review Request
https://reviews.apache.org/r/61274/diff/1#index_header

> NotificationHookConsumer throws exception while shutting down the consumer 
> thread
> ---------------------------------------------------------------------------------
>
>                 Key: ATLAS-1944
>                 URL: https://issues.apache.org/jira/browse/ATLAS-1944
>             Project: Atlas
>          Issue Type: Bug
>          Components:  atlas-core
>    Affects Versions: 0.9-incubating
>            Reporter: Ayub Pathan
>            Assignee: Ashutosh Mestry
>            Priority: Critical
>             Fix For: 0.9-incubating, 0.8.1-incubating
>
>         Attachments: ATLAS-1944.1.patch, ATLAS-1944.patch
>
>
> NotificationHookConsumer throws below exception while shutting down the 
> consumer thread, this issue is possibly after this commit.
> https://github.com/apache/incubator-atlas/commit/0e7f8ea4603c858cc295259bbd1a22314b732f62
> CC [~nixonrodrigues]
> {noformat}
> 2017-07-12 01:26:09,743 WARN  - [pool-1-thread-1:] ~ Error stopping service 
> org.apache.atlas.notification.NotificationHookConsumer (Services:69)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1557)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1467)
>         at 
> org.apache.atlas.kafka.AtlasKafkaConsumer.close(AtlasKafkaConsumer.java:88)
>         at 
> org.apache.atlas.notification.NotificationHookConsumer$HookConsumer.stop(NotificationHookConsumer.java:384)
>         at 
> org.apache.atlas.notification.NotificationHookConsumer.stopConsumerThreads(NotificationHookConsumer.java:172)
>         at 
> org.apache.atlas.notification.NotificationHookConsumer.stop(NotificationHookConsumer.java:155)
>         at org.apache.atlas.service.Services.stop(Services.java:67)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:366)
>         at 
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeDestroyMethods(InitDestroyAnnotationBeanPostProcessor.java:325)
>         at 
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeDestruction(InitDestroyAnnotationBeanPostProcessor.java:154)
>         at 
> org.springframework.beans.factory.support.DisposableBeanAdapter.destroy(DisposableBeanAdapter.java:253)
>         at 
> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroyBean(DefaultSingletonBeanRegistry.java:578)
>         at 
> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingleton(DefaultSingletonBeanRegistry.java:554)
>         at 
> org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingleton(DefaultListableBeanFactory.java:961)
>         at 
> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingletons(DefaultSingletonBeanRegistry.java:523)
>         at 
> org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingletons(DefaultListableBeanFactory.java:968)
>         at 
> org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1033)
>         at 
> org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1009)
>         at 
> org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:961)
>         at 
> org.springframework.web.context.ContextLoader.closeWebApplicationContext(ContextLoader.java:583)
>         at 
> org.springframework.web.context.ContextLoaderListener.contextDestroyed(ContextLoaderListener.java:116)
>         at 
> org.eclipse.jetty.server.handler.ContextHandler.callContextDestroyed(ContextHandler.java:808)
>         at 
> org.eclipse.jetty.servlet.ServletContextHandler.callContextDestroyed(ServletContextHandler.java:457)
>         at 
> org.eclipse.jetty.server.handler.ContextHandler.doStop(ContextHandler.java:842)
>         at 
> org.eclipse.jetty.servlet.ServletContextHandler.doStop(ServletContextHandler.java:215)
>         at 
> org.eclipse.jetty.webapp.WebAppContext.doStop(WebAppContext.java:529)
>         at 
> org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
>         at 
> org.eclipse.jetty.util.component.ContainerLifeCycle.stop(ContainerLifeCycle.java:143)
>         at 
> org.eclipse.jetty.util.component.ContainerLifeCycle.doStop(ContainerLifeCycle.java:162)
>         at 
> org.eclipse.jetty.server.handler.AbstractHandler.doStop(AbstractHandler.java:73)
>         at org.eclipse.jetty.server.Server.doStop(Server.java:456)
>         at 
> org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to