This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ccacb54068 NIFI-15057 Added documented example of Record handling with
glom in Python
ccacb54068 is described below
commit ccacb54068a85fe74bef42d21334a75298ccb1f6
Author: Pierre Villard <[email protected]>
AuthorDate: Sun Oct 5 12:45:50 2025 +0200
NIFI-15057 Added documented example of Record handling with glom in Python
This closes #10391
Signed-off-by: David Handermann <[email protected]>
---
.../src/main/asciidoc/python-developer-guide.adoc | 27 +++++++
.../PythonControllerInteractionIT.java | 72 ++++++++++++++++-
.../main/resources/extensions/HashRecordField.py | 90 ++++++++++++++++++++++
3 files changed, 188 insertions(+), 1 deletion(-)
diff --git a/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
b/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
index f927ff8740..7e23587bcc 100644
--- a/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
@@ -283,6 +283,33 @@ If the partition has more than one field in the
dictionary, all fields in the di
the Records to be written to the same output FlowFile.
+==== Working with Nested Records
+
+Manipulating just a small portion of an incoming record is a very common use
case for `RecordTransform` processors. A
+practical approach is to rely on a dictionary traversal helper such as the
https://pypi.org/project/glom/[glom] library.
+Python processors can declare third-party dependencies via the `dependencies`
attribute on `ProcessorDetails`; NiFi downloads
+those packages when the processor is first loaded.
+
+The sample `HashRecordField` processor illustrates the pattern. It expects the
processor configuration to supply a glom-style path
+(for example `customer.address.street` or `items.0.price`). That string can be
passed directly to `glom(...)` to read values and
+`assign(...)` to update them without hand-coding dictionary traversal logic:
+
+----
+from glom import assign, glom
+
+_MISSING = object()
+path_spec = 'my.example' # Glom-style path
+value = glom(record, path_spec, default=_MISSING)
+if value not in (None, _MISSING) and not isinstance(value, (dict, list)):
+ hashed = hashlib.sha256(str(value).encode('utf-8')).hexdigest()
+ assign(record, path_spec, hashed, missing=lambda: None)
+----
+
+The `HashRecordField` processor expects the Record Path property to contain a
glom-style string such as `customer.address.street` or
+`items.0.price`. Supplying the same string to `glom(...)` and `assign(...)`
makes it easy to read and update nested fields while
+`RecordTransformResult` persists the modified record using the configured
Record Writer.
+
+
[[flowfile-source]]
=== FlowFileSource
diff --git
a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
index 5969992afc..0a486450a4 100644
---
a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
+++
b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
@@ -17,6 +17,8 @@
package org.apache.nifi.py4j;
+import org.apache.commons.codec.digest.DigestUtils;
+
import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
import org.apache.nifi.components.state.Scope;
@@ -470,8 +472,75 @@ public class PythonControllerInteractionIT {
}
+ @Test
+ public void testHashRecordFieldHappyPath() throws InitializationException {
+ final TestRunner runner =
createRecordTransformRunner("HashRecordField");
+ runner.setProperty("Record Path", "my.example");
+
+ final String json =
"[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]";
+ runner.enqueue(json);
+ waitForValid(runner);
+ runner.run();
+
+ runner.assertTransferCount("success", 1);
+ runner.assertTransferCount("original", 1);
+ final MockFlowFile out =
runner.getFlowFilesForRelationship("success").get(0);
+ final String expectedHash = DigestUtils.sha256Hex("value");
+ out.assertContentEquals("[{\"foo\":\"foo\",\"my\":{\"example\":\"" +
expectedHash + "\"}}]");
+ }
+
+ @Test
+ public void testHashRecordFieldMissingField() throws
InitializationException {
+ final TestRunner runner =
createRecordTransformRunner("HashRecordField");
+ runner.setProperty("Record Path", "does.not.exist");
+
+ final String json =
"[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]";
+ runner.enqueue(json);
+ waitForValid(runner);
+ runner.run();
+
+ runner.assertTransferCount("success", 1);
+ runner.assertTransferCount("original", 1);
+ final MockFlowFile out =
runner.getFlowFilesForRelationship("success").get(0);
+
out.assertContentEquals("[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]");
+ }
+
+ @Test
+ public void testHashRecordFieldNonScalar() throws InitializationException {
+ final TestRunner runner =
createRecordTransformRunner("HashRecordField");
+ runner.setProperty("Record Path", "my");
+
+ final String json =
"[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]";
+ runner.enqueue(json);
+ waitForValid(runner);
+ runner.run();
+
+ runner.assertTransferCount("success", 1);
+ runner.assertTransferCount("original", 1);
+ final MockFlowFile out =
runner.getFlowFilesForRelationship("success").get(0);
+
out.assertContentEquals("[{\"foo\":\"foo\",\"my\":{\"example\":\"value\"}}]");
+ }
+
+ @Test
+ public void testHashRecordFieldLongValue() throws InitializationException {
+ final TestRunner runner =
createRecordTransformRunner("HashRecordField");
+ runner.setProperty("Record Path", "count");
+
+ final String json = "[{\"count\":7}]";
+ runner.enqueue(json);
+ waitForValid(runner);
+ runner.run();
+
+ runner.assertTransferCount("success", 1);
+ runner.assertTransferCount("original", 1);
+ final MockFlowFile out =
runner.getFlowFilesForRelationship("success").get(0);
+ final String expectedHash = DigestUtils.sha256Hex("7");
+ out.assertContentEquals("[{\"count\":\"" + expectedHash + "\"}]");
+ }
+
+
private TestRunner createRecordTransformRunner(final String type) throws
InitializationException {
- final TestRunner runner = createProcessor("SetRecordField");
+ final TestRunner runner = createProcessor(type);
runner.setValidateExpressionUsage(false);
final JsonTreeReader reader = new JsonTreeReader();
@@ -487,6 +556,7 @@ public class PythonControllerInteractionIT {
return runner;
}
+
@Test
public void testRecordTransformWithInnerRecord() throws
InitializationException {
// Create a SetRecordField Processor
diff --git
a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-python-test-extensions/src/main/resources/extensions/HashRecordField.py
b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-python-test-extensions/src/main/resources/extensions/HashRecordField.py
new file mode 100644
index 0000000000..7f73ccdbe5
--- /dev/null
+++
b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-python-test-extensions/src/main/resources/extensions/HashRecordField.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import hashlib
+
+from glom import assign, glom
+from glom.core import GlomError
+
+from nifiapi.properties import PropertyDescriptor, StandardValidators
+from nifiapi.recordtransform import RecordTransform, RecordTransformResult
+
+
+_MISSING = object()
+
+
+class HashRecordField(RecordTransform):
+ class Java:
+ implements = ['org.apache.nifi.python.processor.RecordTransform']
+
+ class ProcessorDetails:
+ version = '0.0.1-SNAPSHOT'
+ description = 'Hashes a record field using SHA-256.'
+ tags = ['record', 'hash', 'security']
+ dependencies = ['glom == 24.11.0']
+
+ def __init__(self, **kwargs):
+ super().__init__()
+ self.record_path = PropertyDescriptor(
+ name='Record Path',
+ description='Glom path identifying the field that should be hashed
(for example "customer.address.street").',
+ required=True,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR]
+ )
+ self.descriptors = [self.record_path]
+
+ def getPropertyDescriptors(self):
+ return self.descriptors
+
+ def transform(self, context, record, schema, attributemap):
+ record_path_value =
context.getProperty(self.record_path.name).getValue()
+ if not record_path_value:
+ return RecordTransformResult(record=record, schema=schema,
relationship='success')
+
+ path_spec = record_path_value.strip()
+ if not path_spec:
+ self.logger.error('Record Path {} is empty after trimming
whitespace'.format(record_path_value))
+ return RecordTransformResult(record=record, schema=schema,
relationship='success')
+
+ try:
+ current_value = glom(record, path_spec, default=_MISSING)
+ except GlomError as exc:
+ self.logger.error('Failed to resolve path {}:
{}'.format(path_spec, exc))
+ return RecordTransformResult(record=record, schema=schema,
relationship='success')
+
+ if current_value is _MISSING:
+ self.logger.warn('Record Path {} did not resolve to a value;
record left unchanged'.format(path_spec))
+ return RecordTransformResult(record=record, schema=schema,
relationship='success')
+
+ if current_value is None:
+ return RecordTransformResult(record=record, schema=schema,
relationship='success')
+
+ if isinstance(current_value, (dict, list)):
+ self.logger.warn('Record Path {} resolved to a non-scalar value;
skipping hash operation'.format(path_spec))
+ return RecordTransformResult(record=record, schema=schema,
relationship='success')
+
+ try:
+ value_as_string = current_value if isinstance(current_value, str)
else str(current_value)
+ hashed =
hashlib.sha256(value_as_string.encode('utf-8')).hexdigest()
+ except Exception as exc:
+ self.logger.error('Failed to hash value at {}:
{}'.format(path_spec, exc))
+ return RecordTransformResult(record=record, schema=schema,
relationship='success')
+
+ try:
+ assign(record, path_spec, hashed, missing=lambda: None)
+ except GlomError as exc:
+ self.logger.error('Failed to assign hashed value at {}:
{}'.format(path_spec, exc))
+
+ return RecordTransformResult(record=record, schema=schema,
relationship='success')