[ 
https://issues.apache.org/jira/browse/NIFI-10148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mark Payne updated NIFI-10148:
------------------------------
    Labels: SingleFlowFileConcurrency batch concurrency system-tests  (was: )

> When Process Group has FlowFile Concurrency of "Single FlowFile Per Node", 
> can occasionally let two FlowFiles in at the same time.
> ----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: NIFI-10148
>                 URL: https://issues.apache.org/jira/browse/NIFI-10148
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>              Labels: SingleFlowFileConcurrency, batch, concurrency, 
> system-tests
>
> When a Process Group is configured with a FlowFile Concurrency of "Single 
> FlowFile Per Node", only a single FlowFile should enter the Process Group at 
> a time. No other FlowFile should enter on that node until the PG is empty.
> However, occasionally, a second FlowFile appears to be allowed through. This 
> is evidenced by the intermittent failure of the {{BatchFlowBetweenGroupsIT}} 
> system test. Recently, the system tests failed with the following stack trace:
> {code:java}
> java.util.concurrent.TimeoutException: 
> testSingleConcurrencyAndBatchOutputToBatchInputOutput() timed out after 5 
> minutes
>     at 
> org.junit.jupiter.engine.extension.TimeoutInvocation.createTimeoutException(TimeoutInvocation.java:70)
>     at 
> org.junit.jupiter.engine.extension.TimeoutInvocation.proceed(TimeoutInvocation.java:59)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>     at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
>     at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>     at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
>     at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>     at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>     at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
>     at 
> org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
>     at 
> org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
>     at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
>     at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
>     at 
> org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
>     at 
> org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
>     at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
>     at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
>     at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128)
>     at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
>     at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
>     at 
> org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
>     at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
>     Suppressed: java.lang.InterruptedException: sleep interrupted
>         at java.base/java.lang.Thread.sleep(Native Method)
>         at 
> [email protected]/org.apache.nifi.tests.system.NiFiSystemIT.waitFor(NiFiSystemIT.java:310)
>         at 
> [email protected]/org.apache.nifi.tests.system.NiFiSystemIT.waitFor(NiFiSystemIT.java:297)
>         at 
> [email protected]/org.apache.nifi.tests.system.NiFiSystemIT.waitForQueueCountToMatch(NiFiSystemIT.java:354)
>         at 
> [email protected]/org.apache.nifi.tests.system.NiFiSystemIT.waitForQueueCount(NiFiSystemIT.java:348)
>         at 
> [email protected]/org.apache.nifi.tests.system.pg.BatchFlowBetweenGroupsIT.testSingleConcurrencyAndBatchOutputToBatchInputOutput(BatchFlowBetweenGroupsIT.java:121)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>         at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>         at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>         at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>         at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>         at 
> org.junit.jupiter.engine.extension.TimeoutInvocation.proceed(TimeoutInvocation.java:46)
>         ... 63 more
>  {code}
> Digging into the troubleshooting info that was gathered, we can see that the 
> tests was waiting for a queue to have 5 FlowFiles, but it never had 5 
> FlowFiles because two were allowed in simultaneously, resulting in that queue 
> having 10 FlowFiles.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to