Pierre Villard created NIFI-5838:
------------------------------------
Summary: Misconfigured Kite processors can block NiFi
Key: NIFI-5838
URL: https://issues.apache.org/jira/browse/NIFI-5838
Project: Apache NiFi
Issue Type: Bug
Components: Extensions
Reporter: Pierre Villard
Assignee: Pierre Villard
I faced the following situation today: no way to access the NiFi UI (it was
just hanging forever or throwing timeouts), no change in case of NiFi restart,
even with disabling the auto resume feature.
By looking at the thread dump, I found a lot of:
{noformat}
"NiFi Web Server-142" Id=142 BLOCKED on
org.apache.hadoop.ipc.Client$Connection@3843e7a6
at org.apache.hadoop.ipc.Client$Connection.addCall(Client.java:463)
at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:375)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1522)
at org.apache.hadoop.ipc.Client.call(Client.java:1451)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy348.getBlockLocations(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy349.getBlockLocations(Unknown Source)
at
org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
at
org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
- waiting on java.lang.Object@46d84263
at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
at
org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
at
org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
at
org.apache.nifi.processors.kite.AbstractKiteProcessor.getSchema(AbstractKiteProcessor.java:132)
at
org.apache.nifi.processors.kite.AbstractKiteProcessor$3.validate(AbstractKiteProcessor.java:172)
at
org.apache.nifi.components.PropertyDescriptor.validate(PropertyDescriptor.java:200)
at
org.apache.nifi.components.AbstractConfigurableComponent.validate(AbstractConfigurableComponent.java:103)
at
org.apache.nifi.controller.AbstractConfiguredComponent.validate(AbstractConfiguredComponent.java:329)
at
org.apache.nifi.controller.StandardProcessorNode.isValid(StandardProcessorNode.java:968)
at
org.apache.nifi.groups.StandardProcessGroup.getCounts(StandardProcessGroup.java:227)
at
org.apache.nifi.groups.StandardProcessGroup.getCounts(StandardProcessGroup.java:261)
at
org.apache.nifi.groups.StandardProcessGroup.getCounts(StandardProcessGroup.java:261)
at
org.apache.nifi.groups.StandardProcessGroup.getCounts(StandardProcessGroup.java:261)
at
org.apache.nifi.web.StandardNiFiServiceFacade.getSiteToSiteDetails(StandardNiFiServiceFacade.java:2609)
at
org.apache.nifi.web.StandardNiFiServiceFacade$$FastClassBySpringCGLIB$$358780e0.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at
org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:720)
at
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at
org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:85)
at
org.apache.nifi.web.NiFiServiceFacadeLock.proceedWithReadLock(NiFiServiceFacadeLock.java:137)
at
org.apache.nifi.web.NiFiServiceFacadeLock.getLock(NiFiServiceFacadeLock.java:108)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:621)
at
org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:610)
at
org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:68)
at
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at
org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
at
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at
org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655)
at
org.apache.nifi.web.StandardNiFiServiceFacade$$EnhancerBySpringCGLIB$$6379e46a.getSiteToSiteDetails(<generated>)
at
org.apache.nifi.web.api.SiteToSiteResource.getSiteToSiteDetails(SiteToSiteResource.java:160)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60)
at
com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205)
at
com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)
at
com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302)
at
com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108)
at
com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
at
com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84)
at
com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542)
at
com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473)
at
com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419)
at
com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409)
at
com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:409)
at
com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:558)
at
com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:733)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:841)
at
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1634)
at org.apache.nifi.web.filter.RequestLogger.doFilter(RequestLogger.java:66)
at
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1621)
at
org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:316)
at
org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
at
org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
at
org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at
org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:122)
at
org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at
org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
at
org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at
org.apache.nifi.web.security.NiFiAuthenticationFilter.doFilter(NiFiAuthenticationFilter.java:59)
at
org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at
org.apache.nifi.web.security.NiFiAuthenticationFilter.doFilter(NiFiAuthenticationFilter.java:59)
at
org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at
org.apache.nifi.web.security.NiFiAuthenticationFilter.authenticate(NiFiAuthenticationFilter.java:83)
at
org.apache.nifi.web.security.NiFiAuthenticationFilter.doFilter(NiFiAuthenticationFilter.java:57)
at
org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at
org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:213)
at
org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:176)
at
org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
at
org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262)
at
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1621)
at org.apache.nifi.web.filter.TimerFilter.doFilter(TimerFilter.java:51)
at
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1613)
at org.apache.nifi.web.server.JettyServer$2.doFilter(JettyServer.java:908)
at
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1621)
at
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:541)
at
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
at
org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
at
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
at
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
at
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
at
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
at
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
at
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
at
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
at
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
at
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
at
org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
at
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at org.eclipse.jetty.server.Server.handle(Server.java:564)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
at
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
at
org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
at
org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122)
at
org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58)
at
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201)
at
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133)
at
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
at
org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
at java.lang.Thread.run(Thread.java:748)
{noformat}
This is because of the {{validate}} method of the Kite processors (and more
specifically the following processors: {{ConvertAvroSchema}},
{{ConvertCSVToAvro}}, {{ConvertJSONToAvro}}).
By going through the flow.xml.gz file, I found the bad processor which was
configured with:
{noformat}
<property>
<name>Record schema</name>
<value>Liste_queue_id;Limite_alerte_nb;Limite_alerte_volume;processeur_group;Projet;Contact</value>
</property>{noformat}
It appears the users thought it was possible to provide a schema a bit like a
CSV header.
Because of this schema definition, the code:
{code:java}
try {
uri = new URI(uriOrLiteral);
} catch (URISyntaxException e) {
// try to parse the schema as a literal
return parseSchema(uriOrLiteral);
}
{code}
is not throwing any exception, and we go to this part of the code:
{code:java}
} else {
// try to open the file
Path schemaPath = new Path(uri);
FileSystem fs = schemaPath.getFileSystem(conf);
try (InputStream in = fs.open(schemaPath)) {
return parseSchema(uri, in);
}
}
{code}
which is calling the HDFS Distributed file system.
I honestly don't know why this call {{fs.open()}} is causing the BLOCKED
thread... but it caused a lot of troubles.
I'm not sure to see how this could be changed in a non-backward compatible
way... I suggest to check if a scheme is detected and if not, to consider the
processor as invalid. This could impact some users after an upgrade but we can
document it in the upgrade guide if this approach is OK. If there is a better
approach, let me know!
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)