Pierre Villard created NIFI-5838:
------------------------------------

             Summary: Misconfigured Kite processors can block NiFi
                 Key: NIFI-5838
                 URL: https://issues.apache.org/jira/browse/NIFI-5838
             Project: Apache NiFi
          Issue Type: Bug
          Components: Extensions
            Reporter: Pierre Villard
            Assignee: Pierre Villard


I faced the following situation today: no way to access the NiFi UI (it was 
just hanging forever or throwing timeouts), no change in case of NiFi restart, 
even with disabling the auto resume feature.

By looking at the thread dump, I found a lot of:
{noformat}
"NiFi Web Server-142" Id=142 BLOCKED  on 
org.apache.hadoop.ipc.Client$Connection@3843e7a6
    at org.apache.hadoop.ipc.Client$Connection.addCall(Client.java:463)
    at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:375)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1522)
    at org.apache.hadoop.ipc.Client.call(Client.java:1451)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy348.getBlockLocations(Unknown Source)
    at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy349.getBlockLocations(Unknown Source)
    at 
org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
    at 
org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
    at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
    - waiting on java.lang.Object@46d84263
    at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
    at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
    at 
org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
    at 
org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
    at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at 
org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
    at 
org.apache.nifi.processors.kite.AbstractKiteProcessor.getSchema(AbstractKiteProcessor.java:132)
    at 
org.apache.nifi.processors.kite.AbstractKiteProcessor$3.validate(AbstractKiteProcessor.java:172)
    at 
org.apache.nifi.components.PropertyDescriptor.validate(PropertyDescriptor.java:200)
    at 
org.apache.nifi.components.AbstractConfigurableComponent.validate(AbstractConfigurableComponent.java:103)
    at 
org.apache.nifi.controller.AbstractConfiguredComponent.validate(AbstractConfiguredComponent.java:329)
    at 
org.apache.nifi.controller.StandardProcessorNode.isValid(StandardProcessorNode.java:968)
    at 
org.apache.nifi.groups.StandardProcessGroup.getCounts(StandardProcessGroup.java:227)
    at 
org.apache.nifi.groups.StandardProcessGroup.getCounts(StandardProcessGroup.java:261)
    at 
org.apache.nifi.groups.StandardProcessGroup.getCounts(StandardProcessGroup.java:261)
    at 
org.apache.nifi.groups.StandardProcessGroup.getCounts(StandardProcessGroup.java:261)
    at 
org.apache.nifi.web.StandardNiFiServiceFacade.getSiteToSiteDetails(StandardNiFiServiceFacade.java:2609)
    at 
org.apache.nifi.web.StandardNiFiServiceFacade$$FastClassBySpringCGLIB$$358780e0.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
    at 
org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:720)
    at 
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at 
org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:85)
    at 
org.apache.nifi.web.NiFiServiceFacadeLock.proceedWithReadLock(NiFiServiceFacadeLock.java:137)
    at 
org.apache.nifi.web.NiFiServiceFacadeLock.getLock(NiFiServiceFacadeLock.java:108)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:621)
    at 
org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:610)
    at 
org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:68)
    at 
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at 
org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
    at 
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at 
org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655)
    at 
org.apache.nifi.web.StandardNiFiServiceFacade$$EnhancerBySpringCGLIB$$6379e46a.getSiteToSiteDetails(<generated>)
    at 
org.apache.nifi.web.api.SiteToSiteResource.getSiteToSiteDetails(SiteToSiteResource.java:160)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60)
    at 
com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205)
    at 
com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)
    at 
com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302)
    at 
com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108)
    at 
com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
    at 
com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84)
    at 
com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542)
    at 
com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473)
    at 
com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419)
    at 
com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409)
    at 
com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:409)
    at 
com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:558)
    at 
com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:733)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:841)
    at 
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1634)
    at org.apache.nifi.web.filter.RequestLogger.doFilter(RequestLogger.java:66)
    at 
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1621)
    at 
org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:316)
    at 
org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
    at 
org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
    at 
org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at 
org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:122)
    at 
org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at 
org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
    at 
org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at 
org.apache.nifi.web.security.NiFiAuthenticationFilter.doFilter(NiFiAuthenticationFilter.java:59)
    at 
org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at 
org.apache.nifi.web.security.NiFiAuthenticationFilter.doFilter(NiFiAuthenticationFilter.java:59)
    at 
org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at 
org.apache.nifi.web.security.NiFiAuthenticationFilter.authenticate(NiFiAuthenticationFilter.java:83)
    at 
org.apache.nifi.web.security.NiFiAuthenticationFilter.doFilter(NiFiAuthenticationFilter.java:57)
    at 
org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at 
org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:213)
    at 
org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:176)
    at 
org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
    at 
org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262)
    at 
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1621)
    at org.apache.nifi.web.filter.TimerFilter.doFilter(TimerFilter.java:51)
    at 
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1613)
    at org.apache.nifi.web.server.JettyServer$2.doFilter(JettyServer.java:908)
    at 
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1621)
    at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:541)
    at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
    at 
org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
    at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
    at 
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
    at 
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
    at 
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
    at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
    at 
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
    at 
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
    at 
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
    at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
    at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at 
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
    at 
org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
    at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
    at org.eclipse.jetty.server.Server.handle(Server.java:564)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
    at 
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
    at 
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
    at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
    at 
org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
    at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
    at 
org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122)
    at 
org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58)
    at 
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201)
    at 
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133)
    at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
    at 
org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
    at java.lang.Thread.run(Thread.java:748)
{noformat}
This is because of the {{validate}} method of the Kite processors (and more 
specifically the following processors: {{ConvertAvroSchema}}, 
{{ConvertCSVToAvro}}, {{ConvertJSONToAvro}}).

By going through the flow.xml.gz file, I found the bad processor which was 
configured with:
{noformat}
<property>
  <name>Record schema</name>
  
<value>Liste_queue_id;Limite_alerte_nb;Limite_alerte_volume;processeur_group;Projet;Contact</value>
</property>{noformat}
It appears the users thought it was possible to provide a schema a bit like a 
CSV header.

Because of this schema definition, the code:
{code:java}
try {
  uri = new URI(uriOrLiteral);
} catch (URISyntaxException e) {
  // try to parse the schema as a literal
  return parseSchema(uriOrLiteral);
}
{code}
is not throwing any exception, and we go to this part of the code:
{code:java}
} else {
  // try to open the file
  Path schemaPath = new Path(uri);
  FileSystem fs = schemaPath.getFileSystem(conf);
  try (InputStream in = fs.open(schemaPath)) {
    return parseSchema(uri, in);
  }
}
{code}
which is calling the HDFS Distributed file system.

I honestly don't know why this call {{fs.open()}} is causing the BLOCKED 
thread... but it caused a lot of troubles.

I'm not sure to see how this could be changed in a non-backward compatible 
way... I suggest to check if a scheme is detected and if not, to consider the 
processor as invalid. This could impact some users after an upgrade but we can 
document it in the upgrade guide if this approach is OK. If there is a better 
approach, let me know!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to