[
https://issues.apache.org/jira/browse/ATLAS-1944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Nixon Rodrigues updated ATLAS-1944:
-----------------------------------
Attachment: ATLAS-1944.1.patch
Background: Stoping NotificationHookConsumer was throwing
ConcurrentModificationException exception while shutting down the consumer
thread. The stop method of thread was calling the kafkacosumer.stop method
while in run method of HookConsumer which is busy in consuming the kafka
message by polling kafka server. This simultaneous act of stopping and polling
causes ConcurrentModificationException.
Fix:-
Added try finally block in run method which wraps the while loop and when loop
ends the finally blocks calls the consumer close method which ensures that
close is called after consuming is ended.
Replaced the ShutdownThread interface which was delaying the shutdown process
of consumer with Runnable interface used earlier.
Reference : -
https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
Review Request
https://reviews.apache.org/r/61274/diff/1#index_header
> NotificationHookConsumer throws exception while shutting down the consumer
> thread
> ---------------------------------------------------------------------------------
>
> Key: ATLAS-1944
> URL: https://issues.apache.org/jira/browse/ATLAS-1944
> Project: Atlas
> Issue Type: Bug
> Components: atlas-core
> Affects Versions: 0.9-incubating
> Reporter: Ayub Pathan
> Assignee: Ashutosh Mestry
> Priority: Critical
> Fix For: 0.9-incubating, 0.8.1-incubating
>
> Attachments: ATLAS-1944.1.patch, ATLAS-1944.patch
>
>
> NotificationHookConsumer throws below exception while shutting down the
> consumer thread, this issue is possibly after this commit.
> https://github.com/apache/incubator-atlas/commit/0e7f8ea4603c858cc295259bbd1a22314b732f62
> CC [~nixonrodrigues]
> {noformat}
> 2017-07-12 01:26:09,743 WARN - [pool-1-thread-1:] ~ Error stopping service
> org.apache.atlas.notification.NotificationHookConsumer (Services:69)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1557)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1467)
> at
> org.apache.atlas.kafka.AtlasKafkaConsumer.close(AtlasKafkaConsumer.java:88)
> at
> org.apache.atlas.notification.NotificationHookConsumer$HookConsumer.stop(NotificationHookConsumer.java:384)
> at
> org.apache.atlas.notification.NotificationHookConsumer.stopConsumerThreads(NotificationHookConsumer.java:172)
> at
> org.apache.atlas.notification.NotificationHookConsumer.stop(NotificationHookConsumer.java:155)
> at org.apache.atlas.service.Services.stop(Services.java:67)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:366)
> at
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeDestroyMethods(InitDestroyAnnotationBeanPostProcessor.java:325)
> at
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeDestruction(InitDestroyAnnotationBeanPostProcessor.java:154)
> at
> org.springframework.beans.factory.support.DisposableBeanAdapter.destroy(DisposableBeanAdapter.java:253)
> at
> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroyBean(DefaultSingletonBeanRegistry.java:578)
> at
> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingleton(DefaultSingletonBeanRegistry.java:554)
> at
> org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingleton(DefaultListableBeanFactory.java:961)
> at
> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingletons(DefaultSingletonBeanRegistry.java:523)
> at
> org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingletons(DefaultListableBeanFactory.java:968)
> at
> org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1033)
> at
> org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1009)
> at
> org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:961)
> at
> org.springframework.web.context.ContextLoader.closeWebApplicationContext(ContextLoader.java:583)
> at
> org.springframework.web.context.ContextLoaderListener.contextDestroyed(ContextLoaderListener.java:116)
> at
> org.eclipse.jetty.server.handler.ContextHandler.callContextDestroyed(ContextHandler.java:808)
> at
> org.eclipse.jetty.servlet.ServletContextHandler.callContextDestroyed(ServletContextHandler.java:457)
> at
> org.eclipse.jetty.server.handler.ContextHandler.doStop(ContextHandler.java:842)
> at
> org.eclipse.jetty.servlet.ServletContextHandler.doStop(ServletContextHandler.java:215)
> at
> org.eclipse.jetty.webapp.WebAppContext.doStop(WebAppContext.java:529)
> at
> org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
> at
> org.eclipse.jetty.util.component.ContainerLifeCycle.stop(ContainerLifeCycle.java:143)
> at
> org.eclipse.jetty.util.component.ContainerLifeCycle.doStop(ContainerLifeCycle.java:162)
> at
> org.eclipse.jetty.server.handler.AbstractHandler.doStop(AbstractHandler.java:73)
> at org.eclipse.jetty.server.Server.doStop(Server.java:456)
> at
> org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)