This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 9574ad623f9b5a675102aa3e4134135d240823a7 Author: Gabor Gyimesi <[email protected]> AuthorDate: Thu Nov 20 14:40:35 2025 +0100 MINIFICPP-2692 Fix flow file removal for volatile repositories Closes #2076 Signed-off-by: Marton Szasz <[email protected]> --- .../containers/minifi_container.py | 9 ++++++ .../minifi_test_framework/steps/checking_steps.py | 10 +++++++ .../src/minifi_test_framework/steps/core_steps.py | 7 +++++ .../tests/features/repository.feature | 32 ++++++++++++++++++++++ libminifi/src/core/ProcessSession.cpp | 11 ++++++-- 5 files changed, 66 insertions(+), 3 deletions(-) diff --git a/behave_framework/src/minifi_test_framework/containers/minifi_container.py b/behave_framework/src/minifi_test_framework/containers/minifi_container.py index 805b04227..b2cfeaeaf 100644 --- a/behave_framework/src/minifi_test_framework/containers/minifi_container.py +++ b/behave_framework/src/minifi_test_framework/containers/minifi_container.py @@ -15,6 +15,7 @@ # limitations under the License. # +import logging from docker.models.networks import Network from minifi_test_framework.containers.file import File @@ -82,3 +83,11 @@ class MinifiContainer(Container): def _get_log_properties_file_content(self): lines = (f"{key}={value}" for key, value in self.log_properties.items()) return "\n".join(lines) + + def get_memory_usage(self) -> int | None: + exit_code, output = self.exec_run(["awk", "/VmRSS/ { printf \"%d\\n\", $2 }", "/proc/1/status"]) + if exit_code != 0: + return None + memory_usage_in_bytes = int(output.strip()) * 1024 + logging.info(f"MiNiFi memory usage: {memory_usage_in_bytes} bytes") + return memory_usage_in_bytes diff --git a/behave_framework/src/minifi_test_framework/steps/checking_steps.py b/behave_framework/src/minifi_test_framework/steps/checking_steps.py index 231a2be89..418c49c4f 100644 --- a/behave_framework/src/minifi_test_framework/steps/checking_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/checking_steps.py @@ -142,3 +142,13 @@ def step_impl(context: MinifiTestContext, content: str, directory: str, duration assert wait_for_condition( condition=lambda: context.get_default_minifi_container().verify_path_with_json_content(directory, content), timeout_seconds=timeout_in_seconds, bail_condition=lambda: context.get_default_minifi_container().exited, context=context) + + +@then('MiNiFi\'s memory usage does not increase by more than {max_increase} after {duration}') +def step_impl(context: MinifiTestContext, max_increase: str, duration: str): + time_in_seconds = humanfriendly.parse_timespan(duration) + max_increase_in_bytes = humanfriendly.parse_size(max_increase) + initial_memory_usage = context.get_default_minifi_container().get_memory_usage() + time.sleep(time_in_seconds) + final_memory_usage = context.get_default_minifi_container().get_memory_usage() + assert final_memory_usage - initial_memory_usage <= max_increase_in_bytes diff --git a/behave_framework/src/minifi_test_framework/steps/core_steps.py b/behave_framework/src/minifi_test_framework/steps/core_steps.py index 552ea2d19..343e622f7 100644 --- a/behave_framework/src/minifi_test_framework/steps/core_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/core_steps.py @@ -19,6 +19,7 @@ import logging import random import string import os +import time import humanfriendly from behave import when, step, given @@ -66,3 +67,9 @@ def step_impl(context: MinifiTestContext, filename: str, container_path: str, co @given('a host resource file "{filename}" is bound to the "{container_path}" path in the MiNiFi container') def step_impl(context: MinifiTestContext, filename: str, container_path: str): context.execute_steps(f"given a host resource file \"{filename}\" is bound to the \"{container_path}\" path in the MiNiFi container \"{DEFAULT_MINIFI_CONTAINER_NAME}\"") + + +@step("after {duration} have passed") +@step("after {duration} has passed") +def step_impl(context, duration): + time.sleep(humanfriendly.parse_timespan(duration)) diff --git a/extensions/standard-processors/tests/features/repository.feature b/extensions/standard-processors/tests/features/repository.feature new file mode 100644 index 000000000..5cc8c6949 --- /dev/null +++ b/extensions/standard-processors/tests/features/repository.feature @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +@CORE +Feature: Flow file and content repositories work as expected + + Scenario: Flow file content is removed from memory when terminated when using Volatile Content Repository + Given a GenerateFlowFile processor with the "File Size" property set to "20 MB" + And the scheduling period of the GenerateFlowFile processor is set to "1 sec" + And a LogAttribute processor + And LogAttribute is EVENT_DRIVEN + And the "success" relationship of the GenerateFlowFile processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated + And MiNiFi configuration "nifi.content.repository.class.name" is set to "VolatileContentRepository" + And MiNiFi configuration "nifi.flowfile.repository.class.name" is set to "NoOpRepository" + + When the MiNiFi instance starts up + And after 5 seconds have passed + + Then MiNiFi's memory usage does not increase by more than 50 MB after 30 seconds diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 1eeb5a009..a0d7bf884 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -1038,10 +1038,15 @@ void ProcessSessionImpl::persistFlowFilesBeforeTransfer( std::map<Connectable*, std::vector<std::shared_ptr<core::FlowFile> > >& transactionMap, const std::map<utils::Identifier, FlowFileUpdate>& modifiedFlowFiles) { - std::vector<std::pair<std::string, std::unique_ptr<io::BufferStream>>> flowData; - auto flowFileRepo = process_context_->getFlowFileRepository(); - auto contentRepo = process_context_->getContentRepository(); + + // In case of a noop repository we do not persist anything, flow files are only stored in memory, so we do not need to adjust the owned count + // Otherwise the increase of the owned count would result in memory leaks, as the count is not decreased later in the noop repository + if (flowFileRepo->isNoop()) { + return; + } + + std::vector<std::pair<std::string, std::unique_ptr<io::BufferStream>>> flowData; enum class Type { Dropped, Transferred
