[ https://issues.apache.org/jira/browse/NIFI-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Joe Witt updated NIFI-12971: ---------------------------- Summary: Provide a utility to detect leaked ProcessSession objects in unit tests (was: Processor may leak / lose ProcessSession with FlowFile) > Provide a utility to detect leaked ProcessSession objects in unit tests > ----------------------------------------------------------------------- > > Key: NIFI-12971 > URL: https://issues.apache.org/jira/browse/NIFI-12971 > Project: Apache NiFi > Issue Type: Improvement > Affects Versions: 1.25.0, 2.0.0-M2 > Reporter: endzeit > Priority: Major > > When developing processors for NiFi, developers need to implement > [Processor|https://www.javadoc.io/doc/org.apache.nifi/nifi-api/latest/org/apache/nifi/processor/Processor.html]. > Most often this is done by extending > [AbstractProcessor|https://www.javadoc.io/doc/org.apache.nifi/nifi-api/latest/org/apache/nifi/processor/AbstractProcessor.html] > which ensures that the > [ProcessSession|https://www.javadoc.io/doc/org.apache.nifi/nifi-api/latest/org/apache/nifi/processor/ProcessSession.html] > used is either commited or, if that's not possible, rolled back. > In cases where the developer needs more control over session management, they > might extend from > [AbstractSessionFactoryProcessor|https://www.javadoc.io/doc/org.apache.nifi/nifi-api/latest/org/apache/nifi/processor/AbstractSessionFactoryProcessor.html] > instead, which allows to create and handle {{ProcessSessions}} on their own > terms. > When using the latter, developers need to ensure they handle all sessions > created gracefully, that is, to commit or roll back all sessions they create, > like {{AbstractProcessor}} ensures. > However, failing to do so may lead to unnoticed leakage / lost of > [FlowFile|https://www.javadoc.io/doc/org.apache.nifi/nifi-api/latest/org/apache/nifi/flowfile/FlowFile.html]s > and their associated data. > While data might be recovered from provenance, users are most likely not even > aware of the data loss, as > there won't be a bulletin visible in the UI indicating data loss due to no > Exception occuring or am error being logged. > The following is a minimal example, which reproduces the problem. All > {{FlowFiles}} that enter the processor leak and eventually get lost when the > processor is shut down. > {code:java} > @InputRequirement(INPUT_REQUIRED) > public class LeakFlowFile extends AbstractSessionFactoryProcessor { > public static final Relationship REL_SUCCESS = new Relationship.Builder() > .name("success") > .description("All FlowFiles are routed to this relationship.") > .build(); > private static final Set<Relationship> RELATIONSHIPS = > Set.of(REL_SUCCESS); > @Override > public Set<Relationship> getRelationships() { > return RELATIONSHIPS; > } > @Override > public void onTrigger(ProcessContext context, ProcessSessionFactory > sessionFactory) throws ProcessException { > ProcessSession session = sessionFactory.createSession(); > FlowFile flowFile = session.get(); > if (flowFile == null) { > return; > } > session.transfer(flowFile, REL_SUCCESS); > // whoops, no commit or rollback > } > } {code} > While the issue is quite obvious in this example, it might not be for more > complex processors, e.g. when based on > [BinFiles|https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java]. > In case a developer misses to commit / rollback the session in > {{{}processBin{}}}, the same behaviour can be observed. > The behavior also is not made visible by tests. The following test passes, > even though the session has not been committed (or rolled back). > {code:java} > class LeakFlowFileTest { > private final TestRunner testRunner = > TestRunners.newTestRunner(LeakFlowFile.class); > @Test > void doesNotDetectLeak() { > testRunner.enqueue("some data"); > testRunner.run(); > testRunner.assertAllFlowFilesTransferred(LeakFlowFile.REL_SUCCESS, 1); > } > } {code} > ---- > I would like to propose enhancements to NiFi in order to ease detection of > such implementation faults or even confine the harm they might incur. > One approach is to extend the capabilities of TestRunner such that on > shutdown of a tested processor, it checks whether all sessions that were > created during the test and had a change associated with them, e.g. pulling a > FlowFile or adjusting state, do not have pending changes left but were > properly handled, e.g. by committing the session. In case that's not the > case, the test may fail, similar to trying to commit a session where > FlowFiles haven't been transferred / removed. > This way, developers that test their processors thoroughly might catch such > implementation mistakes early on even before they get into production. > However, in case tests are missing for a scenario or all together, the issue > might get overlooked and happen in production. On the plus side, such a > change would only affect the development and had no (e.g. performance) impact > on the production environments. Side node: Maybe the TestRunner also should > not treat FlowFiles as transferred until the session in which the transfer > was issued is committed. > A different approach would be to enhance {{AbstractSessionFactoryProcessor}} > or a cooperating component to check for unhandled {{ProcessSessions}} on > processor shutdown. In case an unhandled session is found, an error should be > logged to make the issue visible and the session handled, probably by rolling > back. While this might reduce the chance for data loss, it might impact > performance and have other unpredictable side effects for processors, that > interact with external systems and communicated some sort of changes already. > However, this might be worth it in order to not lose data. > The approaches might be combined to reap the benefits of both, that is, > catching issues in test cases early on but having a more graceful behaviour > in production in case am issue is overlooked. -- This message was sent by Atlassian Jira (v8.20.10#820010)