This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e84d6f676 NIFI-14337 Enhanced JoltTransformJSON to Support Transform 
on Attributes
7e84d6f676 is described below

commit 7e84d6f676c815c866f0601277703272e4a3e301
Author: SriRam <[email protected]>
AuthorDate: Fri Mar 14 10:53:19 2025 +1100

    NIFI-14337 Enhanced JoltTransformJSON to Support Transform on Attributes
    
    This closes #9785
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/processors/jolt/JoltTransformJSON.java    | 84 +++++++++++++++++-----
 .../nifi/processors/jolt/JsonSourceStrategy.java   | 46 ++++++++++++
 .../processors/jolt/TestJoltTransformJSON.java     | 81 +++++++++++++++++----
 3 files changed, 178 insertions(+), 33 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java
 
b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java
index afc72bca10..669827f30d 100644
--- 
a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java
+++ 
b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java
@@ -30,6 +30,7 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.jolt.util.TransformUtils;
@@ -41,6 +42,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
 import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
 
 import java.io.InputStream;
@@ -55,11 +57,28 @@ import java.util.stream.Stream;
 @Tags({"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr", 
"cardinality", "sort"})
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @WritesAttribute(attribute = "mime.type", description = "Always set to 
application/json")
-@CapabilityDescription("Applies a list of Jolt specifications to the flowfile 
JSON payload. A new FlowFile is created "
-        + "with transformed content and is routed to the 'success' 
relationship. If the JSON transform "
-        + "fails, the original FlowFile is routed to the 'failure' 
relationship.")
+@CapabilityDescription("Applies a list of Jolt specifications to either the 
FlowFile JSON content or a specified FlowFile JSON attribute. "
+        + "If the JSON transform fails, the original FlowFile is routed to the 
'failure' relationship.")
 @RequiresInstanceClassLoading
 public class JoltTransformJSON extends AbstractJoltTransform {
+
+    public static final PropertyDescriptor JSON_SOURCE = new 
PropertyDescriptor.Builder()
+            .name("JSON Source")
+            .description("Specifies whether the Jolt transformation is applied 
to FlowFile JSON content or to specified FlowFile JSON attribute.")
+            .required(true)
+            .allowableValues(JsonSourceStrategy.class)
+            .defaultValue(JsonSourceStrategy.FLOW_FILE)
+            .build();
+
+    public static final PropertyDescriptor JSON_SOURCE_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("JSON Source Attribute")
+            .description("The FlowFile attribute containing JSON to be 
transformed.")
+            .dependsOn(JSON_SOURCE, JsonSourceStrategy.ATTRIBUTE)
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+            .build();
+
     public static final PropertyDescriptor PRETTY_PRINT = new 
PropertyDescriptor.Builder()
             .name("Pretty Print")
             .displayName("Pretty Print")
@@ -80,16 +99,18 @@ public class JoltTransformJSON extends 
AbstractJoltTransform {
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
-            .description("The FlowFile with transformed content will be routed 
to this relationship")
+            .description("The FlowFile with successfully transformed content 
or updated attribute will be routed to this relationship")
             .build();
     public static final Relationship REL_FAILURE = new Relationship.Builder()
             .name("failure")
-            .description("If a FlowFile fails processing for any reason (for 
example, the FlowFile is not valid JSON), it will be routed to this 
relationship")
+            .description("If the JSON transformation fails (e.g., due to 
invalid JSON in the content or attribute), the original FlowFile is routed to 
this relationship.")
             .build();
 
     private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Stream.concat(
             getCommonPropertyDescriptors().stream(),
             Stream.of(
+                    JSON_SOURCE,
+                    JSON_SOURCE_ATTRIBUTE,
                     PRETTY_PRINT,
                     MAX_STRING_LENGTH
             )
@@ -122,14 +143,34 @@ public class JoltTransformJSON extends 
AbstractJoltTransform {
 
         final ComponentLog logger = getLogger();
         final StopWatch stopWatch = new StopWatch(true);
-
         final Object inputJson;
-        try (final InputStream in = session.read(original)) {
-            inputJson = jsonUtil.jsonToObject(in);
-        } catch (final Exception e) {
-            logger.error("JSON parsing failed for {}", original, e);
-            session.transfer(original, REL_FAILURE);
-            return;
+        final boolean sourceStrategyFlowFile = JsonSourceStrategy.FLOW_FILE == 
context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class);
+        String jsonSourceAttributeName = null;
+
+        if (sourceStrategyFlowFile) {
+            try (final InputStream in = session.read(original)) {
+                inputJson = jsonUtil.jsonToObject(in);
+            } catch (final Exception e) {
+                logger.error("JSON parsing failed on FlowFile content for {}", 
original, e);
+                session.transfer(original, REL_FAILURE);
+                return;
+            }
+        } else {
+            jsonSourceAttributeName = 
context.getProperty(JSON_SOURCE_ATTRIBUTE).getValue();
+            final String jsonSourceAttributeValue = 
original.getAttribute(jsonSourceAttributeName);
+            if (StringUtils.isBlank(jsonSourceAttributeValue)) {
+                logger.error("FlowFile attribute '{}' value is blank", 
jsonSourceAttributeName);
+                session.transfer(original, REL_FAILURE);
+                return;
+            } else {
+                try {
+                    inputJson = 
jsonUtil.jsonToObject(jsonSourceAttributeValue);
+                } catch (final Exception e) {
+                    logger.error("JSON parsing failed on attribute '{}' of 
FlowFile {}", jsonSourceAttributeName, original, e);
+                    session.transfer(original, REL_FAILURE);
+                    return;
+                }
+            }
         }
 
         final String jsonString;
@@ -152,13 +193,18 @@ public class JoltTransformJSON extends 
AbstractJoltTransform {
             }
         }
 
-        FlowFile transformed = session.write(original, out -> 
out.write(jsonString.getBytes(StandardCharsets.UTF_8)));
-
-        final String transformType = 
context.getProperty(JOLT_TRANSFORM).getValue();
-        transformed = session.putAttribute(transformed, 
CoreAttributes.MIME_TYPE.key(), "application/json");
-        session.transfer(transformed, REL_SUCCESS);
-        session.getProvenanceReporter().modifyContent(transformed, "Modified 
With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-        logger.info("Transform completed for {}", original);
+        if (sourceStrategyFlowFile) {
+            FlowFile transformed = session.write(original, out -> 
out.write(jsonString.getBytes(StandardCharsets.UTF_8)));
+            final String transformType = 
context.getProperty(JOLT_TRANSFORM).getValue();
+            transformed = session.putAttribute(transformed, 
CoreAttributes.MIME_TYPE.key(), "application/json");
+            session.transfer(transformed, REL_SUCCESS);
+            session.getProvenanceReporter().modifyContent(transformed, 
"Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            logger.info("Transform completed on FlowFile content for {}", 
original);
+        } else {
+            session.putAttribute(original, jsonSourceAttributeName, 
jsonString);
+            session.transfer(original, REL_SUCCESS);
+            logger.info("Transform completed on attribute '{}' of FlowFile 
{}", jsonSourceAttributeName, original);
+        }
     }
 
     @OnScheduled
diff --git 
a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JsonSourceStrategy.java
 
b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JsonSourceStrategy.java
new file mode 100644
index 0000000000..898142c4f7
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JsonSourceStrategy.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.jolt;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum JsonSourceStrategy implements DescribedValue {
+    FLOW_FILE("Transformation applied to FlowFile content containing JSON"),
+    ATTRIBUTE("Transformation applied to FlowFile attribute containing JSON");
+
+    private final String description;
+
+    JsonSourceStrategy(final String description) {
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return name();
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java
 
b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java
index 593e4dc518..9912cf5e61 100644
--- 
a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java
+++ 
b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java
@@ -51,6 +51,11 @@ class TestJoltTransformJSON {
     final static Path JSON_INPUT = 
Paths.get("src/test/resources/TestJoltTransformJson/input.json");
     final static Diffy DIFFY = new Diffy();
     final static String CHAINR_SPEC_PATH = 
"src/test/resources/specs/chainrSpec.json";
+    final static String SHIFTR_SPEC_PATH = 
"src/test/resources/specs/shiftrSpec.json";
+    final static String SHIFTR_JSON_OUTPUT = "shiftrOutput.json";
+    final static String CHAINR_JSON_OUTPUT = "chainrOutput.json";
+    private static final String JSON_SOURCE_ATTR_NAME = "jsonSourceAttr";
+
     static String chainrSpecContents;
     private Processor processor;
     private TestRunner runner;
@@ -199,36 +204,35 @@ class TestJoltTransformJSON {
 
     @ParameterizedTest(name = "{index} {1}")
     @MethodSource("getChainrArguments")
-    /*NOTE: Even though description is not used in the actual test, it needs 
to be declared in order to use it in the ParameterizedTest name argument*/
+        /*NOTE: Even though description is not used in the actual test, it 
needs to be declared in order to use it in the ParameterizedTest name argument*/
     void testTransformInputWithChainr(Path specPath, String 
ignoredDescription) throws IOException {
         final String spec = Files.readString(specPath);
         runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
         runner.enqueue(JSON_INPUT);
         runner.run();
 
-        assertTransformedEquals("chainrOutput.json");
+        assertTransformedEquals(CHAINR_JSON_OUTPUT);
     }
 
     @Test
     void testTransformInputWithShiftr() throws IOException {
-        final String spec = 
Files.readString(Paths.get("src/test/resources/specs/shiftrSpec.json"));
+        final String spec = Files.readString(Paths.get(SHIFTR_SPEC_PATH));
         runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
         runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, 
JoltTransformStrategy.SHIFTR);
         runner.enqueue(JSON_INPUT);
         runner.run();
 
-        assertTransformedEquals("shiftrOutput.json");
+        assertTransformedEquals(SHIFTR_JSON_OUTPUT);
     }
 
     @Test
     void testTransformInputWithShiftrFromFile() throws IOException {
-        final String spec = "./src/test/resources/specs/shiftrSpec.json";
-        runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
+        runner.setProperty(JoltTransformJSON.JOLT_SPEC, SHIFTR_SPEC_PATH);
         runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, 
JoltTransformStrategy.SHIFTR);
         runner.enqueue(JSON_INPUT);
         runner.run();
 
-        assertTransformedEquals("shiftrOutput.json");
+        assertTransformedEquals(SHIFTR_JSON_OUTPUT);
     }
 
     @Test
@@ -243,7 +247,7 @@ class TestJoltTransformJSON {
         runner.enqueue(JSON_INPUT, attributes);
         runner.run();
 
-        assertTransformedEquals("shiftrOutput.json");
+        assertTransformedEquals(SHIFTR_JSON_OUTPUT);
     }
 
     String addAccentedChars(String input) {
@@ -252,13 +256,13 @@ class TestJoltTransformJSON {
 
     @Test
     void testTransformInputWithShiftrAccentedChars() throws IOException {
-        final String spec = 
addAccentedChars(Files.readString(Paths.get("src/test/resources/specs/shiftrSpec.json")));
+        final String spec = 
addAccentedChars(Files.readString(Paths.get(SHIFTR_SPEC_PATH)));
         runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
         runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, 
JoltTransformStrategy.SHIFTR);
         runner.enqueue(addAccentedChars(Files.readString(JSON_INPUT)));
         runner.run();
 
-        assertTransformedEquals("shiftrOutput.json");
+        assertTransformedEquals(SHIFTR_JSON_OUTPUT);
     }
 
     @Test
@@ -382,7 +386,7 @@ class TestJoltTransformJSON {
         runner.enqueue(JSON_INPUT);
         runner.run();
 
-        assertTransformedEquals("chainrOutput.json");
+        assertTransformedEquals(CHAINR_JSON_OUTPUT);
     }
 
     @Test
@@ -401,7 +405,7 @@ class TestJoltTransformJSON {
         runner.enqueue(JSON_INPUT, customSpecs);
         runner.run();
 
-        assertTransformedEquals("chainrOutput.json");
+        assertTransformedEquals(CHAINR_JSON_OUTPUT);
     }
 
     @Test
@@ -414,7 +418,7 @@ class TestJoltTransformJSON {
         runner.enqueue(JSON_INPUT);
         runner.run();
 
-        assertTransformedEquals("chainrOutput.json");
+        assertTransformedEquals(CHAINR_JSON_OUTPUT);
     }
 
     @Test
@@ -426,7 +430,7 @@ class TestJoltTransformJSON {
         runner.enqueue(JSON_INPUT);
         runner.run();
 
-        assertTransformedEquals("chainrOutput.json");
+        assertTransformedEquals(CHAINR_JSON_OUTPUT);
     }
 
     @Test
@@ -464,6 +468,55 @@ class TestJoltTransformJSON {
         runner.assertNotValid();
     }
 
+    private static Stream<Arguments> provideJsonSourceAttributeArguments() {
+        String INVALID_INPUT_JSON = 
"{\"rating\":{\"primary\":{\"value\":3},\"series\":{\"value\":[5,4]},\"quality\":{\"value\":}}}";
+        String EXPECTED_JSON = 
"{\"rating\":{\"primary\":{\"value\":3},\"series\":{\"value\":[5,4]},\"quality\":{\"value\":3}}}";
+
+        return Stream.of(
+                Arguments.argumentSet("testJsonAttributeNotInitialised", 
JSON_SOURCE_ATTR_NAME, null,
+                        SHIFTR_SPEC_PATH, JoltTransformStrategy.SHIFTR, false, 
null),
+                Arguments.argumentSet("testInvalidJsonAttribute", 
JSON_SOURCE_ATTR_NAME, Map.of(JSON_SOURCE_ATTR_NAME, INVALID_INPUT_JSON),
+                        SHIFTR_SPEC_PATH, JoltTransformStrategy.SHIFTR, false, 
null),
+                Arguments.argumentSet("testValidJsonAttribute", 
JSON_SOURCE_ATTR_NAME, Map.of(JSON_SOURCE_ATTR_NAME, EXPECTED_JSON),
+                        CHAINR_SPEC_PATH, JoltTransformStrategy.CHAINR, true, 
CHAINR_JSON_OUTPUT)
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideJsonSourceAttributeArguments")
+    void testJsonSourceAttribute(String jsonSourceAttribute,
+                                 Map<String, String> flowFileAttributes,
+                                 String joltSpec,
+                                 JoltTransformStrategy joltStrategy,
+                                 boolean expectSuccess,
+                                 String expectedOutputFile) throws IOException 
{
+        runner.setProperty(JoltTransformJSON.JOLT_SPEC, joltSpec);
+        runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, joltStrategy);
+        runner.setProperty(JoltTransformJSON.JSON_SOURCE, 
JsonSourceStrategy.ATTRIBUTE);
+        runner.setProperty(JoltTransformJSON.JSON_SOURCE_ATTRIBUTE, 
jsonSourceAttribute);
+        runner.enqueue(JSON_INPUT, flowFileAttributes != null ? 
flowFileAttributes : Collections.emptyMap());
+        runner.run();
+
+        if (expectSuccess) {
+            assertTransformedJsonAttributeEquals(expectedOutputFile);
+        } else {
+            
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_FAILURE);
+        }
+    }
+
+    private void assertTransformedJsonAttributeEquals(final String 
expectedOutputContent) throws IOException {
+        runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
+
+        final MockFlowFile transformed = 
runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).getFirst();
+        transformed.assertAttributeExists(JSON_SOURCE_ATTR_NAME);
+
+        final Object transformedJson = 
JsonUtils.jsonToObject(transformed.getAttribute(JSON_SOURCE_ATTR_NAME));
+
+        final String compareOutputPath = 
"src/test/resources/TestJoltTransformJson/%s".formatted(expectedOutputContent);
+        final Object compareJson = 
JsonUtils.jsonToObject(Files.newInputStream(Paths.get(compareOutputPath)));
+        assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
+    }
+
     private void assertTransformedEquals(final String expectedOutputFilename) 
throws IOException {
         runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
 

Reply via email to