This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 1e199675e43b3cb8fae7e6718ff039c76dc93ff6
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed Aug 21 13:36:04 2024 +0200

    MINIFICPP-2439 Restrict failure and original python relationship usage
    
    Closes #1852
    
    Signed-off-by: Martin Zink <[email protected]>
---
 docker/test/integration/cluster/ImageStore.py      |  8 +++++--
 docker/test/integration/features/python.feature    | 24 ++++++++++++++++++++
 .../minifi/processors/FailureWithContent.py        | 26 ++++++++++++++++++++++
 .../minifi/processors/TransferToOriginal.py        | 26 ++++++++++++++++++++++
 .../resources/python/FailureWithContent.py         | 24 ++++++++++++++++++++
 .../python/ProcessContextInterfaceChecker.py       | 16 ++++++-------
 .../resources/python/SpecialPropertyTypeChecker.py | 22 +++++++++---------
 .../resources/python/TransferToOriginal.py         | 24 ++++++++++++++++++++
 .../pythonprocessors/nifiapi/flowfiletransform.py  |  8 +++++++
 9 files changed, 157 insertions(+), 21 deletions(-)

diff --git a/docker/test/integration/cluster/ImageStore.py 
b/docker/test/integration/cluster/ImageStore.py
index 6983f36f6..ce0a694d4 100644
--- a/docker/test/integration/cluster/ImageStore.py
+++ b/docker/test/integration/cluster/ImageStore.py
@@ -169,6 +169,8 @@ class ImageStore:
                 COPY RelativeImporterProcessor.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/RelativeImporterProcessor.py
                 COPY multiplierutils.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/compute/processors/multiplierutils.py
                 COPY CreateNothing.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/CreateNothing.py
+                COPY FailureWithContent.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/FailureWithContent.py
+                COPY TransferToOriginal.py 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/TransferToOriginal.py
                 RUN wget {parse_document_url} 
--directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors
 && \\
                     wget {chunk_document_url} 
--directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors
 && \\
                     echo 'langchain<=0.17.0' > 
/opt/minifi/minifi-current/minifi-python/nifi_python_processors/requirements.txt
 && \\
@@ -195,8 +197,10 @@ class ImageStore:
                                                os.path.join(self.test_dir, 
"resources", "python", "FailureWithAttributes.py"),
                                                os.path.join(self.test_dir, 
"resources", "python", "RelativeImporterProcessor.py"),
                                                os.path.join(self.test_dir, 
"resources", "python", "subtractutils.py"),
-                                               os.path.join(self.test_dir, 
"resources", "python", "multiplierutils.py")])
-                                               os.path.join(self.test_dir, 
"resources", "python", "CreateNothing.py")])
+                                               os.path.join(self.test_dir, 
"resources", "python", "multiplierutils.py"),
+                                               os.path.join(self.test_dir, 
"resources", "python", "CreateNothing.py"),
+                                               os.path.join(self.test_dir, 
"resources", "python", "FailureWithContent.py"),
+                                               os.path.join(self.test_dir, 
"resources", "python", "TransferToOriginal.py")])
 
     def __build_http_proxy_image(self):
         dockerfile = dedent("""\
diff --git a/docker/test/integration/features/python.feature 
b/docker/test/integration/features/python.feature
index 7acc83536..ab73628dd 100644
--- a/docker/test/integration/features/python.feature
+++ b/docker/test/integration/features/python.feature
@@ -198,3 +198,27 @@ Feature: MiNiFi can use python processors in its flows
     When the MiNiFi instance starts up
     Then no files are placed in the monitored directory in 10 seconds of 
running time
     And the Minifi logs do not contain the following message: "Caught 
Exception during SchedulingAgent::onTrigger of processor CreateNothing" after 1 
seconds
+
+  @USE_NIFI_PYTHON_PROCESSORS
+  Scenario: NiFi native python processor cannot specify content of failure 
result
+    Given a GenerateFlowFile processor with the "File Size" property set to 
"0B"
+    And a FailureWithContent processor
+    And python is installed on the MiNiFi agent with a pre-created virtualenv
+
+    And the "success" relationship of the GenerateFlowFile processor is 
connected to the FailureWithContent
+
+    When all instances start up
+
+    Then the Minifi logs contain the following message: "'failure' 
relationship should not have content, the original flow file will be 
transferred automatically in this case." in less than 60 seconds
+
+  @USE_NIFI_PYTHON_PROCESSORS
+  Scenario: NiFi native python processor cannot transfer to original 
relationship
+    Given a GenerateFlowFile processor with the "File Size" property set to 
"0B"
+    And a TransferToOriginal processor
+    And python is installed on the MiNiFi agent with a pre-created virtualenv
+
+    And the "success" relationship of the GenerateFlowFile processor is 
connected to the TransferToOriginal
+
+    When all instances start up
+
+    Then the Minifi logs contain the following message: "Result relationship 
cannot be 'original', it is reserved for the original flow file, and 
transferred automatically in non-failure cases." in less than 60 seconds
diff --git a/docker/test/integration/minifi/processors/FailureWithContent.py 
b/docker/test/integration/minifi/processors/FailureWithContent.py
new file mode 100644
index 000000000..88384a792
--- /dev/null
+++ b/docker/test/integration/minifi/processors/FailureWithContent.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ..core.Processor import Processor
+
+
+class FailureWithContent(Processor):
+    def __init__(self, context):
+        super(FailureWithContent, self).__init__(
+            context=context,
+            clazz='FailureWithContent',
+            
class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.',
+            properties={},
+            schedule={'scheduling strategy': 'EVENT_DRIVEN'},
+            auto_terminate=[])
diff --git a/docker/test/integration/minifi/processors/TransferToOriginal.py 
b/docker/test/integration/minifi/processors/TransferToOriginal.py
new file mode 100644
index 000000000..6155d4032
--- /dev/null
+++ b/docker/test/integration/minifi/processors/TransferToOriginal.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ..core.Processor import Processor
+
+
+class TransferToOriginal(Processor):
+    def __init__(self, context):
+        super(TransferToOriginal, self).__init__(
+            context=context,
+            clazz='TransferToOriginal',
+            
class_prefix='org.apache.nifi.minifi.processors.nifi_python_processors.',
+            properties={},
+            schedule={'scheduling strategy': 'EVENT_DRIVEN'},
+            auto_terminate=[])
diff --git a/docker/test/integration/resources/python/FailureWithContent.py 
b/docker/test/integration/resources/python/FailureWithContent.py
new file mode 100644
index 000000000..a5e03d222
--- /dev/null
+++ b/docker/test/integration/resources/python/FailureWithContent.py
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+
+
+class FailureWithContent(FlowFileTransform):
+    def __init__(self, **kwargs):
+        pass
+
+    def transform(self, context, flowFile):
+        return FlowFileTransformResult("failure", contents="Content?")
diff --git 
a/docker/test/integration/resources/python/ProcessContextInterfaceChecker.py 
b/docker/test/integration/resources/python/ProcessContextInterfaceChecker.py
index 83cf9f27a..5d0136ade 100644
--- a/docker/test/integration/resources/python/ProcessContextInterfaceChecker.py
+++ b/docker/test/integration/resources/python/ProcessContextInterfaceChecker.py
@@ -63,30 +63,30 @@ class ProcessContextInterfaceChecker(FlowFileTransform):
     def transform(self, context, flowFile):
         properties = context.getProperties()
         if len(properties) != 3:
-            return FlowFileTransformResult("failure", contents="Property count 
is invalid")
+            return FlowFileTransformResult("failure")
 
         property_names = [property.name for property in properties]
         if "Secret Password" not in property_names or "Request Timeout" not in 
property_names or "Wish Count" not in property_names:
-            return FlowFileTransformResult("failure", contents="Missing 
properties")
+            return FlowFileTransformResult("failure")
 
         for property in properties:
             if property.name == "Secret Password" and properties[property] != 
"mysecret":
-                return FlowFileTransformResult("failure", contents="Secret 
Password value is invalid")
+                return FlowFileTransformResult("failure")
             elif property.name == "Request Timeout" and properties[property] 
!= "60 sec":
-                return FlowFileTransformResult("failure", contents="Request 
Timeout value is invalid")
+                return FlowFileTransformResult("failure")
             elif property.name == "Wish Count" and properties[property] != "3":
-                return FlowFileTransformResult("failure", contents="Wish Count 
value is invalid")
+                return FlowFileTransformResult("failure")
 
         secret_password = context.getProperty(self.SECRET_PASSWORD).getValue()
         if secret_password != "mysecret":
-            return FlowFileTransformResult("failure", contents="Secret 
password is invalid")
+            return FlowFileTransformResult("failure")
 
         timeout = context.getProperty(self.REQUEST_TIMEOUT).getValue()
         if timeout != "60 sec":
-            return FlowFileTransformResult("failure", contents="Request 
timeout is invalid")
+            return FlowFileTransformResult("failure")
 
         wish_count = context.getProperty(self.WISH_COUNT).getValue()
         if wish_count != "3":
-            return FlowFileTransformResult("failure", contents="Wish count is 
invalid")
+            return FlowFileTransformResult("failure")
 
         return FlowFileTransformResult("myrelationship", contents="Check 
successful!")
diff --git 
a/docker/test/integration/resources/python/SpecialPropertyTypeChecker.py 
b/docker/test/integration/resources/python/SpecialPropertyTypeChecker.py
index ce404a754..f9b0a35aa 100644
--- a/docker/test/integration/resources/python/SpecialPropertyTypeChecker.py
+++ b/docker/test/integration/resources/python/SpecialPropertyTypeChecker.py
@@ -56,57 +56,57 @@ class SpecialPropertyTypeChecker(FlowFileTransform):
         time_in_microseconds = 
context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.MICROSECONDS)
         if time_in_microseconds != 7200000000:
             self.logger.error("Time period property conversion to microseconds 
is not working as expected")
-            return FlowFileTransformResult("failure", contents="Time period 
property conversion to microseconds is not working as expected")
+            return FlowFileTransformResult("failure")
 
         time_in_milliseconds = 
context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.MILLISECONDS)
         if time_in_milliseconds != 7200000:
             self.logger.error("Time period property conversion to milliseconds 
is not working as expected")
-            return FlowFileTransformResult("failure", contents="Time period 
property conversion to milliseconds is not working as expected")
+            return FlowFileTransformResult("failure")
 
         time_in_seconds = 
context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.SECONDS)
         if time_in_seconds != 7200:
             self.logger.error("Time period property conversion to seconds is 
not working as expected")
-            return FlowFileTransformResult("failure", contents="Time period 
property conversion to seconds is not working as expected")
+            return FlowFileTransformResult("failure")
 
         time_in_minutes = 
context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.MINUTES)
         if time_in_minutes != 120:
             self.logger.error("Time period property conversion to minutes is 
not working as expected")
-            return FlowFileTransformResult("failure", contents="Time period 
property conversion to minutes is not working as expected")
+            return FlowFileTransformResult("failure")
 
         time_in_hours = 
context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.HOURS)
         if time_in_hours != 2:
             self.logger.error("Time period property conversion to hours is not 
working as expected")
-            return FlowFileTransformResult("failure", contents="Time period 
property conversion to hours is not working as expected")
+            return FlowFileTransformResult("failure")
 
         time_in_days = 
context.getProperty(self.TIME_PERIOD_PROPERTY).asTimePeriod(TimeUnit.DAYS)
         if time_in_days != 0:
             self.logger.error("Time period property conversion to days is not 
working as expected")
-            return FlowFileTransformResult("failure", contents="Time period 
property conversion to days is not working as expected")
+            return FlowFileTransformResult("failure")
 
         data_size_in_bytes = 
context.getProperty(self.DATA_SIZE_PROPERTY).asDataSize(DataUnit.B)
         if data_size_in_bytes != 104857600.0:
             self.logger.error("Data size property conversion to bytes is not 
working as expected")
-            return FlowFileTransformResult("failure", contents="Data size 
property conversion to bytes is not working as expected")
+            return FlowFileTransformResult("failure")
 
         data_size_in_kilobytes = 
context.getProperty(self.DATA_SIZE_PROPERTY).asDataSize(DataUnit.KB)
         if data_size_in_kilobytes != 102400.0:
             self.logger.error("Data size property conversion to kilobytes is 
not working as expected")
-            return FlowFileTransformResult("failure", contents="Data size 
property conversion to kilobytes is not working as expected")
+            return FlowFileTransformResult("failure")
 
         data_size_in_megabytes = 
context.getProperty(self.DATA_SIZE_PROPERTY).asDataSize(DataUnit.MB)
         if data_size_in_megabytes != 100.0:
             self.logger.error("Data size property conversion to megabytes is 
not working as expected")
-            return FlowFileTransformResult("failure", contents="Data size 
property conversion to megabytes is not working as expected")
+            return FlowFileTransformResult("failure")
 
         data_size_in_gigabytes = 
context.getProperty(self.DATA_SIZE_PROPERTY).asDataSize(DataUnit.GB)
         if data_size_in_gigabytes != 0.09765625:
             self.logger.error("Data size property conversion to gigabytes is 
not working as expected")
-            return FlowFileTransformResult("failure", contents="Data size 
property conversion to gigabytes is not working as expected")
+            return FlowFileTransformResult("failure")
 
         ssl_context = 
context.getProperty(self.SSL_CONTEXT_PROPERTY).asControllerService()
         cert = ssl_context.getCertificateFile()
         if cert != "/tmp/resources/minifi_client.crt":
             self.logger.error("SSL Context Service property is not working as 
expected")
-            return FlowFileTransformResult("failure", contents="SSL Context 
Service property is not working as expected")
+            return FlowFileTransformResult("failure")
 
         return FlowFileTransformResult("success", contents="Check successful!")
diff --git a/docker/test/integration/resources/python/TransferToOriginal.py 
b/docker/test/integration/resources/python/TransferToOriginal.py
new file mode 100644
index 000000000..783a1e55c
--- /dev/null
+++ b/docker/test/integration/resources/python/TransferToOriginal.py
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+
+
+class TransferToOriginal(FlowFileTransform):
+    def __init__(self, **kwargs):
+        pass
+
+    def transform(self, context, flowFile):
+        return FlowFileTransformResult("original")
diff --git a/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py 
b/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py
index 429f60729..be17a266e 100644
--- a/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py
+++ b/extensions/python/pythonprocessors/nifiapi/flowfiletransform.py
@@ -64,12 +64,20 @@ class FlowFileTransform(ProcessorBase):
             session.transfer(original_flow_file, self.REL_FAILURE)
             return
 
+        if result.getRelationship() == "original":
+            session.remove(flow_file)
+            self.logger.error("Result relationship cannot be 'original', it is 
reserved for the original flow file, and transferred automatically in 
non-failure cases.")
+            session.transfer(original_flow_file, self.REL_FAILURE)
+            return
+
         result_attributes = result.getAttributes()
         if result.getRelationship() == "failure":
             session.remove(flow_file)
             if result_attributes is not None:
                 for name, value in result_attributes.items():
                     original_flow_file.setAttribute(name, value)
+            if result.getContents() is not None:
+                self.logger.error("'failure' relationship should not have 
content, the original flow file will be transferred automatically in this 
case.")
             session.transfer(original_flow_file, self.REL_FAILURE)
             return
 

Reply via email to