This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit dc5c441ede6f2cc7642565f06504edf184cf6d3d Author: Gabor Gyimesi <[email protected]> AuthorDate: Fri Aug 16 13:47:12 2024 +0000 MINIFICPP-2445 Allow NiFi Python Source processor triggering without creating flow files Closes #1857 Signed-off-by: Marton Szasz <[email protected]> --- docker/test/integration/cluster/ImageStore.py | 2 ++ docker/test/integration/features/python.feature | 10 +++++++ .../integration/minifi/processors/CreateNothing.py | 25 +++++++++++++++++ .../integration/resources/python/CreateNothing.py | 32 ++++++++++++++++++++++ .../pythonprocessors/nifiapi/flowfilesource.py | 2 ++ 5 files changed, 71 insertions(+) diff --git a/docker/test/integration/cluster/ImageStore.py b/docker/test/integration/cluster/ImageStore.py index bd16a33dd..6983f36f6 100644 --- a/docker/test/integration/cluster/ImageStore.py +++ b/docker/test/integration/cluster/ImageStore.py @@ -168,6 +168,7 @@ class ImageStore: COPY subtractutils.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/subtractutils.py COPY RelativeImporterProcessor.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/RelativeImporterProcessor.py COPY multiplierutils.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/multiplierutils.py + COPY CreateNothing.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateNothing.py RUN wget {parse_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\ wget {chunk_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\ echo 'langchain<=0.17.0' > /opt/minifi/minifi-current/minifi-python/nifi_python_processors/requirements.txt && \\ @@ -195,6 +196,7 @@ class ImageStore: os.path.join(self.test_dir, "resources", "python", "RelativeImporterProcessor.py"), os.path.join(self.test_dir, "resources", "python", "subtractutils.py"), os.path.join(self.test_dir, "resources", "python", "multiplierutils.py")]) + os.path.join(self.test_dir, "resources", "python", "CreateNothing.py")]) def __build_http_proxy_image(self): dockerfile = dedent("""\ diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature index a59b74bdd..7acc83536 100644 --- a/docker/test/integration/features/python.feature +++ b/docker/test/integration/features/python.feature @@ -188,3 +188,13 @@ Feature: MiNiFi can use python processors in its flows When all instances start up Then one flowfile with the contents "The final result is 1990" is placed in the monitored directory in less than 30 seconds + + @USE_NIFI_PYTHON_PROCESSORS + Scenario: NiFi native python processor is allowed to be triggered without creating any flow files + Given a CreateNothing processor + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the CreateNothing processor is connected to the PutFile + And python is installed on the MiNiFi agent with a pre-created virtualenv + When the MiNiFi instance starts up + Then no files are placed in the monitored directory in 10 seconds of running time + And the Minifi logs do not contain the following message: "Caught Exception during SchedulingAgent::onTrigger of processor CreateNothing" after 1 seconds diff --git a/docker/test/integration/minifi/processors/CreateNothing.py b/docker/test/integration/minifi/processors/CreateNothing.py new file mode 100644 index 000000000..6c1d41e27 --- /dev/null +++ b/docker/test/integration/minifi/processors/CreateNothing.py @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ..core.Processor import Processor + + +class CreateNothing(Processor): + def __init__(self, context, schedule={'scheduling period': '1 sec'}): + super(CreateNothing, self).__init__( + context=context, + clazz='CreateNothing', + class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.', + schedule=schedule, + auto_terminate=[]) diff --git a/docker/test/integration/resources/python/CreateNothing.py b/docker/test/integration/resources/python/CreateNothing.py new file mode 100644 index 000000000..b39dbcee4 --- /dev/null +++ b/docker/test/integration/resources/python/CreateNothing.py @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from nifiapi.flowfilesource import FlowFileSource + + +class CreateNothing(FlowFileSource): + class Java: + implements = ['org.apache.nifi.python.processor.FlowFileSource'] + + class ProcessorDetails: + version = '0.0.1-SNAPSHOT' + description = '''A Python processor for testing a use-case where the Source processor + does not create any output.''' + tags = ['test', 'python', 'source'] + + def __init__(self, **kwargs): + pass + + def create(self, context): + return None diff --git a/extensions/python/pythonprocessors/nifiapi/flowfilesource.py b/extensions/python/pythonprocessors/nifiapi/flowfilesource.py index 46510a925..be676d9ff 100644 --- a/extensions/python/pythonprocessors/nifiapi/flowfilesource.py +++ b/extensions/python/pythonprocessors/nifiapi/flowfilesource.py @@ -50,6 +50,8 @@ class FlowFileSource(ProcessorBase): context_proxy = ProcessContextProxy(context, self) try: result = self.create(context_proxy) + if not result: + return except Exception: self.logger.error("Failed to create flow file due to error:\n{}".format(traceback.format_exc())) return
