This is an automated email from the ASF dual-hosted git repository. martinzink pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 1e199675e43b3cb8fae7e6718ff039c76dc93ff6 Author: Gabor Gyimesi <[email protected]> AuthorDate: Wed Aug 21 13:36:04 2024 +0200 MINIFICPP-2439 Restrict failure and original python relationship usage Closes #1852 Signed-off-by: Martin Zink <[email protected]> --- docker/test/integration/cluster/ImageStore.py | 8 +++++-- docker/test/integration/features/python.feature | 24 ++++++++++++++++++++ .../minifi/processors/FailureWithContent.py | 26 ++++++++++++++++++++++ .../minifi/processors/TransferToOriginal.py | 26 ++++++++++++++++++++++ .../resources/python/FailureWithContent.py | 24 ++++++++++++++++++++ .../python/ProcessContextInterfaceChecker.py | 16 ++++++------- .../resources/python/SpecialPropertyTypeChecker.py | 22 +++++++++--------- .../resources/python/TransferToOriginal.py | 24 ++++++++++++++++++++ .../pythonprocessors/nifiapi/flowfiletransform.py | 8 +++++++ 9 files changed, 157 insertions(+), 21 deletions(-) diff --git a/docker/test/integration/cluster/ImageStore.py b/docker/test/integration/cluster/ImageStore.py index 6983f36f6..ce0a694d4 100644 --- a/docker/test/integration/cluster/ImageStore.py +++ b/docker/test/integration/cluster/ImageStore.py @@ -169,6 +169,8 @@ class ImageStore: COPY RelativeImporterProcessor.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/RelativeImporterProcessor.py COPY multiplierutils.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/multiplierutils.py COPY CreateNothing.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateNothing.py + COPY FailureWithContent.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithContent.py + COPY TransferToOriginal.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/TransferToOriginal.py RUN wget {parse_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\ wget {chunk_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\ echo 'langchain<=0.17.0' > /opt/minifi/minifi-current/minifi-python/nifi_python_processors/requirements.txt && \\ @@ -195,8 +197,10 @@ class ImageStore: os.path.join(self.test_dir, "resources", "python", "FailureWithAttributes.py"), os.path.join(self.test_dir, "resources", "python", "RelativeImporterProcessor.py"), os.path.join(self.test_dir, "resources", "python", "subtractutils.py"), - os.path.join(self.test_dir, "resources", "python", "multiplierutils.py")]) - os.path.join(self.test_dir, "resources", "python", "CreateNothing.py")]) + os.path.join(self.test_dir, "resources", "python", "multiplierutils.py"), + os.path.join(self.test_dir, "resources", "python", "CreateNothing.py"), + os.path.join(self.test_dir, "resources", "python", "FailureWithContent.py"), + os.path.join(self.test_dir, "resources", "python", "TransferToOriginal.py")]) def __build_http_proxy_image(self): dockerfile = dedent("""\ diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature index 7acc83536..ab73628dd 100644 --- a/docker/test/integration/features/python.feature +++ b/docker/test/integration/features/python.feature @@ -198,3 +198,27 @@ Feature: MiNiFi can use python processors in its flows When the MiNiFi instance starts up Then no files are placed in the monitored directory in 10 seconds of running time And the Minifi logs do not contain the following message: "Caught Exception during SchedulingAgent::onTrigger of processor CreateNothing" after 1 seconds + + @USE_NIFI_PYTHON_PROCESSORS + Scenario: NiFi native python processor cannot specify content of failure result + Given a GenerateFlowFile processor with the "File Size" property set to "0B" + And a FailureWithContent processor + And python is installed on the MiNiFi agent with a pre-created virtualenv + + And the "success" relationship of the GenerateFlowFile processor is connected to the FailureWithContent + + When all instances start up + + Then the Minifi logs contain the following message: "'failure' relationship should not have content, the original flow file will be transferred automatically in this case." in less than 60 seconds + + @USE_NIFI_PYTHON_PROCESSORS + Scenario: NiFi native python processor cannot transfer to original relationship + Given a GenerateFlowFile processor with the "File Size" property set to "0B" + And a TransferToOriginal processor + And python is installed on the MiNiFi agent with a pre-created virtualenv + + And the "success" relationship of the GenerateFlowFile processor is connected to the TransferToOriginal + + When all instances start up + + Then the Minifi logs contain the following message: "Result relationship cannot be 'original', it is reserved for the original flow file, and transferred automatically in non-failure cases." in less than 60 seconds diff --git a/docker/test/integration/minifi/processors/FailureWithContent.py b/docker/test/integration/minifi/processors/FailureWithContent.py new file mode 100644 index 000000000..88384a792 --- /dev/null +++ b/docker/test/integration/minifi/processors/FailureWithContent.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ..core.Processor import Processor + + +class FailureWithContent(Processor): + def __init__(self, context): + super(FailureWithContent, self).__init__( + context=context, + clazz='FailureWithContent', + class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.', + properties={}, + schedule={'scheduling strategy': 'EVENT_DRIVEN'}, + auto_terminate=[]) diff --git a/docker/test/integration/minifi/processors/TransferToOriginal.py b/docker/test/integration/minifi/processors/TransferToOriginal.py new file mode 100644 index 000000000..6155d4032 --- /dev/null +++ b/docker/test/integration/minifi/processors/TransferToOriginal.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ..core.Processor import Processor + + +class TransferToOriginal(Processor): + def __init__(self, context): + super(TransferToOriginal, self).__init__( + context=context, + clazz='TransferToOriginal', + class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.', + properties={}, + schedule={'scheduling strategy': 'EVENT_DRIVEN'}, + auto_terminate=[]) diff --git a/docker/test/integration/resources/python/FailureWithContent.py b/docker/test/integration/resources/python/FailureWithContent.py new file mode 100644 index 000000000..a5e03d222 --- /dev/null +++ b/docker/test/integration/resources/python/FailureWithContent.py @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult + + +class FailureWithContent(FlowFileTransform): + def __init__(self, **kwargs): + pass + + def transform(self, context, flowFile): + return FlowFileTransformResult("failure", contents="Content?") diff --git a/docker/test/integration/resources/python/ProcessContextInterfaceChecker.py b/docker/test/integration/resources/python/ProcessContextInterfaceChecker.py index 83cf9f27a..5d0136ade 100644 --- a/docker/test/integration/resources/python/ProcessContextInterfaceChecker.py +++ b/docker/test/integration/resources/python/ProcessContextInterfaceChecker.py @@ -63,30 +63,30 @@ class ProcessContextInterfaceChecker(FlowFileTransform): def transform(self, context, flowFile): properties = context.getProperties() if len(properties) != 3: - return FlowFileTransformResult("failure", contents="Property count is invalid") + return FlowFileTransformResult("failure") property_names = [property.name for property in properties] if "Secret Password" not in property_names or "Request Timeout" not in property_names or "Wish Count" not in property_names: - return FlowFileTransformResult("failure", contents="Missing properties") + return FlowFileTransformResult("failure") for property in properties: if property.name == "Secret Password" and properties[property] != "mysecret": - return FlowFileTransformResult("failure", contents="Secret Password value is invalid") + return FlowFileTransformResult("failure") elif property.name == "Request Timeout" and properties[property] != "60 sec": - return FlowFileTransformResult("failure", contents="Request Timeout value is invalid") + return FlowFileTransformResult("failure") elif property.name == "Wish Count" and properties[property] != "3": - return FlowFileTransformResult("failure", contents="Wish Count value is invalid") + return FlowFileTransformResult("failure") secret_password = context.getProperty(self.SECRET_PASSWORD).getValue() if secret_password != "mysecret": - return FlowFileTransformResult("failure", contents="Secret password is invalid") + return FlowFileTransformResult("failure") timeout = context.getProperty(self.REQUEST_TIMEOUT).getValue() if timeout != "60 sec": - return FlowFileTransformResult("failure", contents="Request timeout is invalid") + return FlowFileTransformResult("failure") wish_count = context.getProperty(self.WISH_COUNT).getValue() if wish_count != "3": - return FlowFileTransformResult("failure", contents="Wish count is invalid") + return FlowFileTransformResult("failure") return FlowFileTransformResult("myrelationship", contents="Check successful!") diff --git a/docker/test/integration/resources/python/SpecialPropertyTypeChecker.py b/docker/test/integration/resources/python/SpecialPropertyTypeChecker.py index ce404a754..f9b0a35aa 100644 --- a/docker/test/integration/resources/python/SpecialPropertyTypeChecker.py +++ b/docker/test/integration/resources/python/SpecialPropertyTypeChecker.py @@ -56,57 +56,57 @@ class SpecialPropertyTypeChecker(FlowFileTransform): time_in_microseconds = context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.MICROSECONDS) if time_in_microseconds != 7200000000: self.logger.error("Time period property conversion to microseconds is not working as expected") - return FlowFileTransformResult("failure", contents="Time period property conversion to microseconds is not working as expected") + return FlowFileTransformResult("failure") time_in_milliseconds = context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.MILLISECONDS) if time_in_milliseconds != 7200000: self.logger.error("Time period property conversion to milliseconds is not working as expected") - return FlowFileTransformResult("failure", contents="Time period property conversion to milliseconds is not working as expected") + return FlowFileTransformResult("failure") time_in_seconds = context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.SECONDS) if time_in_seconds != 7200: self.logger.error("Time period property conversion to seconds is not working as expected") - return FlowFileTransformResult("failure", contents="Time period property conversion to seconds is not working as expected") + return FlowFileTransformResult("failure") time_in_minutes = context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.MINUTES) if time_in_minutes != 120: self.logger.error("Time period property conversion to minutes is not working as expected") - return FlowFileTransformResult("failure", contents="Time period property conversion to minutes is not working as expected") + return FlowFileTransformResult("failure") time_in_hours = context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.HOURS) if time_in_hours != 2: self.logger.error("Time period property conversion to hours is not working as expected") - return FlowFileTransformResult("failure", contents="Time period property conversion to hours is not working as expected") + return FlowFileTransformResult("failure") time_in_days = context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.DAYS) if time_in_days != 0: self.logger.error("Time period property conversion to days is not working as expected") - return FlowFileTransformResult("failure", contents="Time period property conversion to days is not working as expected") + return FlowFileTransformResult("failure") data_size_in_bytes = context.getProperty(self.DATA_SIZE_PROPERTY).asDataSize(DataUnit.B) if data_size_in_bytes != 104857600.0: self.logger.error("Data size property conversion to bytes is not working as expected") - return FlowFileTransformResult("failure", contents="Data size property conversion to bytes is not working as expected") + return FlowFileTransformResult("failure") data_size_in_kilobytes = context.getProperty(self.DATA_SIZE_PROPERTY).asDataSize(DataUnit.KB) if data_size_in_kilobytes != 102400.0: self.logger.error("Data size property conversion to kilobytes is not working as expected") - return FlowFileTransformResult("failure", contents="Data size property conversion to kilobytes is not working as expected") + return FlowFileTransformResult("failure") data_size_in_megabytes = context.getProperty(self.DATA_SIZE_PROPERTY).asDataSize(DataUnit.MB) if data_size_in_megabytes != 100.0: self.logger.error("Data size property conversion to megabytes is not working as expected") - return FlowFileTransformResult("failure", contents="Data size property conversion to megabytes is not working as expected") + return FlowFileTransformResult("failure") data_size_in_gigabytes = context.getProperty(self.DATA_SIZE_PROPERTY).asDataSize(DataUnit.GB) if data_size_in_gigabytes != 0.09765625: self.logger.error("Data size property conversion to gigabytes is not working as expected") - return FlowFileTransformResult("failure", contents="Data size property conversion to gigabytes is not working as expected") + return FlowFileTransformResult("failure") ssl_context = context.getProperty(self.SSL_CONTEXT_PROPERTY).asControllerService() cert = ssl_context.getCertificateFile() if cert != "/tmp/resources/minifi_client.crt": self.logger.error("SSL Context Service property is not working as expected") - return FlowFileTransformResult("failure", contents="SSL Context Service property is not working as expected") + return FlowFileTransformResult("failure") return FlowFileTransformResult("success", contents="Check successful!") diff --git a/docker/test/integration/resources/python/TransferToOriginal.py b/docker/test/integration/resources/python/TransferToOriginal.py new file mode 100644 index 000000000..783a1e55c --- /dev/null +++ b/docker/test/integration/resources/python/TransferToOriginal.py @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult + + +class TransferToOriginal(FlowFileTransform): + def __init__(self, **kwargs): + pass + + def transform(self, context, flowFile): + return FlowFileTransformResult("original") diff --git a/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py b/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py index 429f60729..be17a266e 100644 --- a/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py +++ b/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py @@ -64,12 +64,20 @@ class FlowFileTransform(ProcessorBase): session.transfer(original_flow_file, self.REL_FAILURE) return + if result.getRelationship() == "original": + session.remove(flow_file) + self.logger.error("Result relationship cannot be 'original', it is reserved for the original flow file, and transferred automatically in non-failure cases.") + session.transfer(original_flow_file, self.REL_FAILURE) + return + result_attributes = result.getAttributes() if result.getRelationship() == "failure": session.remove(flow_file) if result_attributes is not None: for name, value in result_attributes.items(): original_flow_file.setAttribute(name, value) + if result.getContents() is not None: + self.logger.error("'failure' relationship should not have content, the original flow file will be transferred automatically in this case.") session.transfer(original_flow_file, self.REL_FAILURE) return
