This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new ccacb54068 NIFI-15057 Added documented example of Record handling with 
glom in Python
ccacb54068 is described below

commit ccacb54068a85fe74bef42d21334a75298ccb1f6
Author: Pierre Villard <[email protected]>
AuthorDate: Sun Oct 5 12:45:50 2025 +0200

    NIFI-15057 Added documented example of Record handling with glom in Python
    
    This closes #10391
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../src/main/asciidoc/python-developer-guide.adoc  | 27 +++++++
 .../PythonControllerInteractionIT.java             | 72 ++++++++++++++++-
 .../main/resources/extensions/HashRecordField.py   | 90 ++++++++++++++++++++++
 3 files changed, 188 insertions(+), 1 deletion(-)

diff --git a/nifi-docs/src/main/asciidoc/python-developer-guide.adoc 
b/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
index f927ff8740..7e23587bcc 100644
--- a/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
@@ -283,6 +283,33 @@ If the partition has more than one field in the 
dictionary, all fields in the di
 the Records to be written to the same output FlowFile.
 
 
+==== Working with Nested Records
+
+Manipulating just a small portion of an incoming record is a very common use 
case for `RecordTransform` processors. A
+practical approach is to rely on a dictionary traversal helper such as the 
https://pypi.org/project/glom/[glom] library.
+Python processors can declare third-party dependencies via the `dependencies` 
attribute on `ProcessorDetails`; NiFi downloads
+those packages when the processor is first loaded.
+
+The sample `HashRecordField` processor illustrates the pattern. It expects the 
processor configuration to supply a glom-style path
+(for example `customer.address.street` or `items.0.price`). That string can be 
passed directly to `glom(...)` to read values and
+`assign(...)` to update them without hand-coding dictionary traversal logic:
+
+----
+from glom import assign, glom
+
+_MISSING = object()
+path_spec = 'my.example'  # Glom-style path
+value = glom(record, path_spec, default=_MISSING)
+if value not in (None, _MISSING) and not isinstance(value, (dict, list)):
+    hashed = hashlib.sha256(str(value).encode('utf-8')).hexdigest()
+    assign(record, path_spec, hashed, missing=lambda: None)
+----
+
+The `HashRecordField` processor expects the Record Path property to contain a 
glom-style string such as `customer.address.street` or
+`items.0.price`. Supplying the same string to `glom(...)` and `assign(...)` 
makes it easy to read and update nested fields while
+`RecordTransformResult` persists the modified record using the configured 
Record Writer.
+
+
 [[flowfile-source]]
 === FlowFileSource
 
diff --git 
a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
 
b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
index 5969992afc..0a486450a4 100644
--- 
a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
+++ 
b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
@@ -17,6 +17,8 @@
 
 package org.apache.nifi.py4j;
 
+import org.apache.commons.codec.digest.DigestUtils;
+
 import org.apache.nifi.components.AsyncLoadedProcessor;
 import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
 import org.apache.nifi.components.state.Scope;
@@ -470,8 +472,75 @@ public class PythonControllerInteractionIT {
     }
 
 
+    @Test
+    public void testHashRecordFieldHappyPath() throws InitializationException {
+        final TestRunner runner = 
createRecordTransformRunner("HashRecordField");
+        runner.setProperty("Record Path", "my.example");
+
+        final String json = 
"[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]";
+        runner.enqueue(json);
+        waitForValid(runner);
+        runner.run();
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("original", 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship("success").get(0);
+        final String expectedHash = DigestUtils.sha256Hex("value");
+        out.assertContentEquals("[{\"foo\":\"foo\",\"my\":{\"example\":\"" + 
expectedHash + "\"}}]");
+    }
+
+    @Test
+    public void testHashRecordFieldMissingField() throws 
InitializationException {
+        final TestRunner runner = 
createRecordTransformRunner("HashRecordField");
+        runner.setProperty("Record Path", "does.not.exist");
+
+        final String json = 
"[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]";
+        runner.enqueue(json);
+        waitForValid(runner);
+        runner.run();
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("original", 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship("success").get(0);
+        
out.assertContentEquals("[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]");
+    }
+
+    @Test
+    public void testHashRecordFieldNonScalar() throws InitializationException {
+        final TestRunner runner = 
createRecordTransformRunner("HashRecordField");
+        runner.setProperty("Record Path", "my");
+
+        final String json = 
"[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]";
+        runner.enqueue(json);
+        waitForValid(runner);
+        runner.run();
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("original", 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship("success").get(0);
+        
out.assertContentEquals("[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]");
+    }
+
+    @Test
+    public void testHashRecordFieldLongValue() throws InitializationException {
+        final TestRunner runner = 
createRecordTransformRunner("HashRecordField");
+        runner.setProperty("Record Path", "count");
+
+        final String json = "[{\"count\":7}]";
+        runner.enqueue(json);
+        waitForValid(runner);
+        runner.run();
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("original", 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship("success").get(0);
+        final String expectedHash = DigestUtils.sha256Hex("7");
+        out.assertContentEquals("[{\"count\":\"" + expectedHash + "\"}]");
+    }
+
+
     private TestRunner createRecordTransformRunner(final String type) throws 
InitializationException {
-        final TestRunner runner = createProcessor("SetRecordField");
+        final TestRunner runner = createProcessor(type);
         runner.setValidateExpressionUsage(false);
 
         final JsonTreeReader reader = new JsonTreeReader();
@@ -487,6 +556,7 @@ public class PythonControllerInteractionIT {
         return runner;
     }
 
+
     @Test
     public void testRecordTransformWithInnerRecord() throws 
InitializationException {
         // Create a SetRecordField Processor
diff --git 
a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-python-test-extensions/src/main/resources/extensions/HashRecordField.py
 
b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-python-test-extensions/src/main/resources/extensions/HashRecordField.py
new file mode 100644
index 0000000000..7f73ccdbe5
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-python-test-extensions/src/main/resources/extensions/HashRecordField.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import hashlib
+
+from glom import assign, glom
+from glom.core import GlomError
+
+from nifiapi.properties import PropertyDescriptor, StandardValidators
+from nifiapi.recordtransform import RecordTransform, RecordTransformResult
+
+
+_MISSING = object()
+
+
+class HashRecordField(RecordTransform):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.RecordTransform']
+
+    class ProcessorDetails:
+        version = '0.0.1-SNAPSHOT'
+        description = 'Hashes a record field using SHA-256.'
+        tags = ['record', 'hash', 'security']
+        dependencies = ['glom == 24.11.0']
+
+    def __init__(self, **kwargs):
+        super().__init__()
+        self.record_path = PropertyDescriptor(
+            name='Record Path',
+            description='Glom path identifying the field that should be hashed 
(for example "customer.address.street").',
+            required=True,
+            validators=[StandardValidators.NON_EMPTY_VALIDATOR]
+        )
+        self.descriptors = [self.record_path]
+
+    def getPropertyDescriptors(self):
+        return self.descriptors
+
+    def transform(self, context, record, schema, attributemap):
+        record_path_value = 
context.getProperty(self.record_path.name).getValue()
+        if not record_path_value:
+            return RecordTransformResult(record=record, schema=schema, 
relationship='success')
+
+        path_spec = record_path_value.strip()
+        if not path_spec:
+            self.logger.error('Record Path {} is empty after trimming 
whitespace'.format(record_path_value))
+            return RecordTransformResult(record=record, schema=schema, 
relationship='success')
+
+        try:
+            current_value = glom(record, path_spec, default=_MISSING)
+        except GlomError as exc:
+            self.logger.error('Failed to resolve path {}: 
{}'.format(path_spec, exc))
+            return RecordTransformResult(record=record, schema=schema, 
relationship='success')
+
+        if current_value is _MISSING:
+            self.logger.warn('Record Path {} did not resolve to a value; 
record left unchanged'.format(path_spec))
+            return RecordTransformResult(record=record, schema=schema, 
relationship='success')
+
+        if current_value is None:
+            return RecordTransformResult(record=record, schema=schema, 
relationship='success')
+
+        if isinstance(current_value, (dict, list)):
+            self.logger.warn('Record Path {} resolved to a non-scalar value; 
skipping hash operation'.format(path_spec))
+            return RecordTransformResult(record=record, schema=schema, 
relationship='success')
+
+        try:
+            value_as_string = current_value if isinstance(current_value, str) 
else str(current_value)
+            hashed = 
hashlib.sha256(value_as_string.encode('utf-8')).hexdigest()
+        except Exception as exc:
+            self.logger.error('Failed to hash value at {}: 
{}'.format(path_spec, exc))
+            return RecordTransformResult(record=record, schema=schema, 
relationship='success')
+
+        try:
+            assign(record, path_spec, hashed, missing=lambda: None)
+        except GlomError as exc:
+            self.logger.error('Failed to assign hashed value at {}: 
{}'.format(path_spec, exc))
+
+        return RecordTransformResult(record=record, schema=schema, 
relationship='success')

Reply via email to