[
https://issues.apache.org/jira/browse/NIFI-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Joe Witt updated NIFI-12971:
----------------------------
Summary: Provide a utility to detect leaked ProcessSession objects in unit
tests or the UI (was: Provide a utility to detect leaked ProcessSession
objects in unit tests)
> Provide a utility to detect leaked ProcessSession objects in unit tests or
> the UI
> ---------------------------------------------------------------------------------
>
> Key: NIFI-12971
> URL: https://issues.apache.org/jira/browse/NIFI-12971
> Project: Apache NiFi
> Issue Type: Improvement
> Affects Versions: 1.25.0, 2.0.0-M2
> Reporter: endzeit
> Priority: Major
>
> When developing processors for NiFi, developers need to implement
> [Processor|https://www.javadoc.io/doc/org.apache.nifi/nifi-api/latest/org/apache/nifi/processor/Processor.html].
> Most often this is done by extending
> [AbstractProcessor|https://www.javadoc.io/doc/org.apache.nifi/nifi-api/latest/org/apache/nifi/processor/AbstractProcessor.html]
> which ensures that the
> [ProcessSession|https://www.javadoc.io/doc/org.apache.nifi/nifi-api/latest/org/apache/nifi/processor/ProcessSession.html]
> used is either commited or, if that's not possible, rolled back.
> In cases where the developer needs more control over session management, they
> might extend from
> [AbstractSessionFactoryProcessor|https://www.javadoc.io/doc/org.apache.nifi/nifi-api/latest/org/apache/nifi/processor/AbstractSessionFactoryProcessor.html]
> instead, which allows to create and handle {{ProcessSessions}} on their own
> terms.
> When using the latter, developers need to ensure they handle all sessions
> created gracefully, that is, to commit or roll back all sessions they create,
> like {{AbstractProcessor}} ensures.
> However, failing to do so may lead to unnoticed leakage / lost of
> [FlowFile|https://www.javadoc.io/doc/org.apache.nifi/nifi-api/latest/org/apache/nifi/flowfile/FlowFile.html]s
> and their associated data.
> While data might be recovered from provenance, users are most likely not even
> aware of the data loss, as
> there won't be a bulletin visible in the UI indicating data loss due to no
> Exception occuring or am error being logged.
> The following is a minimal example, which reproduces the problem. All
> {{FlowFiles}} that enter the processor leak and eventually get lost when the
> processor is shut down.
> {code:java}
> @InputRequirement(INPUT_REQUIRED)
> public class LeakFlowFile extends AbstractSessionFactoryProcessor {
> public static final Relationship REL_SUCCESS = new Relationship.Builder()
> .name("success")
> .description("All FlowFiles are routed to this relationship.")
> .build();
> private static final Set<Relationship> RELATIONSHIPS =
> Set.of(REL_SUCCESS);
> @Override
> public Set<Relationship> getRelationships() {
> return RELATIONSHIPS;
> }
> @Override
> public void onTrigger(ProcessContext context, ProcessSessionFactory
> sessionFactory) throws ProcessException {
> ProcessSession session = sessionFactory.createSession();
> FlowFile flowFile = session.get();
> if (flowFile == null) {
> return;
> }
> session.transfer(flowFile, REL_SUCCESS);
> // whoops, no commit or rollback
> }
> } {code}
> While the issue is quite obvious in this example, it might not be for more
> complex processors, e.g. when based on
> [BinFiles|https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java].
> In case a developer misses to commit / rollback the session in
> {{{}processBin{}}}, the same behaviour can be observed.
> The behavior also is not made visible by tests. The following test passes,
> even though the session has not been committed (or rolled back).
> {code:java}
> class LeakFlowFileTest {
> private final TestRunner testRunner =
> TestRunners.newTestRunner(LeakFlowFile.class);
> @Test
> void doesNotDetectLeak() {
> testRunner.enqueue("some data");
> testRunner.run();
> testRunner.assertAllFlowFilesTransferred(LeakFlowFile.REL_SUCCESS, 1);
> }
> } {code}
> ----
> I would like to propose enhancements to NiFi in order to ease detection of
> such implementation faults or even confine the harm they might incur.
> One approach is to extend the capabilities of TestRunner such that on
> shutdown of a tested processor, it checks whether all sessions that were
> created during the test and had a change associated with them, e.g. pulling a
> FlowFile or adjusting state, do not have pending changes left but were
> properly handled, e.g. by committing the session. In case that's not the
> case, the test may fail, similar to trying to commit a session where
> FlowFiles haven't been transferred / removed.
> This way, developers that test their processors thoroughly might catch such
> implementation mistakes early on even before they get into production.
> However, in case tests are missing for a scenario or all together, the issue
> might get overlooked and happen in production. On the plus side, such a
> change would only affect the development and had no (e.g. performance) impact
> on the production environments. Side node: Maybe the TestRunner also should
> not treat FlowFiles as transferred until the session in which the transfer
> was issued is committed.
> A different approach would be to enhance {{AbstractSessionFactoryProcessor}}
> or a cooperating component to check for unhandled {{ProcessSessions}} on
> processor shutdown. In case an unhandled session is found, an error should be
> logged to make the issue visible and the session handled, probably by rolling
> back. While this might reduce the chance for data loss, it might impact
> performance and have other unpredictable side effects for processors, that
> interact with external systems and communicated some sort of changes already.
> However, this might be worth it in order to not lose data.
> The approaches might be combined to reap the benefits of both, that is,
> catching issues in test cases early on but having a more graceful behaviour
> in production in case am issue is overlooked.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)