[ 
https://issues.apache.org/jira/browse/NIFI-10148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17557120#comment-17557120
 ] 

ASF subversion and git services commented on NIFI-10148:
--------------------------------------------------------

Commit ac8c1b0326fc83b3e574bf9e7eb8ad1a26f7c918 in nifi's branch 
refs/heads/main from Mark Payne
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=ac8c1b0326 ]

NIFI-10148: Fixed bug in SwappablePriorityQueue, in which the Active Queue Size 
could be decremented before the Unacknowledged Queue Size is incremented when 
FlowFiles are polled. This can result in the SwappablePrioriotyQueue 
incorrectly returning a value of true from the isEmpty() method. Additionally, 
updated the NiFiSystemIT so that if waiting for a particular queue size we 
periodically log size of all queues, which aids in debugging, and added 
necessary methods to FlowClient to make that happen.

This closes #6143

Signed-off-by: David Handermann <[email protected]>


> When Process Group has FlowFile Concurrency of "Single FlowFile Per Node", 
> can occasionally let two FlowFiles in at the same time.
> ----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: NIFI-10148
>                 URL: https://issues.apache.org/jira/browse/NIFI-10148
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>              Labels: SingleFlowFileConcurrency, batch, concurrency, 
> system-tests
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> When a Process Group is configured with a FlowFile Concurrency of "Single 
> FlowFile Per Node", only a single FlowFile should enter the Process Group at 
> a time. No other FlowFile should enter on that node until the PG is empty.
> However, occasionally, a second FlowFile appears to be allowed through. This 
> is evidenced by the intermittent failure of the {{BatchFlowBetweenGroupsIT}} 
> system test. Recently, the system tests failed with the following stack trace:
> {code:java}
> java.util.concurrent.TimeoutException: 
> testSingleConcurrencyAndBatchOutputToBatchInputOutput() timed out after 5 
> minutes
>     at 
> org.junit.jupiter.engine.extension.TimeoutInvocation.createTimeoutException(TimeoutInvocation.java:70)
>     at 
> org.junit.jupiter.engine.extension.TimeoutInvocation.proceed(TimeoutInvocation.java:59)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>     at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
>     at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>     at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
>     at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>     at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>     at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
>     at 
> org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
>     at 
> org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
>     at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
>     at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
>     at 
> org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
>     at 
> org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
>     at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
>     at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
>     at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128)
>     at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
>     at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
>     at 
> org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
>     at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
>     Suppressed: java.lang.InterruptedException: sleep interrupted
>         at java.base/java.lang.Thread.sleep(Native Method)
>         at 
> [email protected]/org.apache.nifi.tests.system.NiFiSystemIT.waitFor(NiFiSystemIT.java:310)
>         at 
> [email protected]/org.apache.nifi.tests.system.NiFiSystemIT.waitFor(NiFiSystemIT.java:297)
>         at 
> [email protected]/org.apache.nifi.tests.system.NiFiSystemIT.waitForQueueCountToMatch(NiFiSystemIT.java:354)
>         at 
> [email protected]/org.apache.nifi.tests.system.NiFiSystemIT.waitForQueueCount(NiFiSystemIT.java:348)
>         at 
> [email protected]/org.apache.nifi.tests.system.pg.BatchFlowBetweenGroupsIT.testSingleConcurrencyAndBatchOutputToBatchInputOutput(BatchFlowBetweenGroupsIT.java:121)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>         at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>         at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>         at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>         at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>         at 
> org.junit.jupiter.engine.extension.TimeoutInvocation.proceed(TimeoutInvocation.java:46)
>         ... 63 more
>  {code}
> Digging into the troubleshooting info that was gathered, we can see that the 
> tests was waiting for a queue to have 5 FlowFiles, but it never had 5 
> FlowFiles because two were allowed in simultaneously, resulting in that queue 
> having 10 FlowFiles.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to