This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 7e84d6f676 NIFI-14337 Enhanced JoltTransformJSON to Support Transform
on Attributes
7e84d6f676 is described below
commit 7e84d6f676c815c866f0601277703272e4a3e301
Author: SriRam <[email protected]>
AuthorDate: Fri Mar 14 10:53:19 2025 +1100
NIFI-14337 Enhanced JoltTransformJSON to Support Transform on Attributes
This closes #9785
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/processors/jolt/JoltTransformJSON.java | 84 +++++++++++++++++-----
.../nifi/processors/jolt/JsonSourceStrategy.java | 46 ++++++++++++
.../processors/jolt/TestJoltTransformJSON.java | 81 +++++++++++++++++----
3 files changed, 178 insertions(+), 33 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java
b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java
index afc72bca10..669827f30d 100644
---
a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java
+++
b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java
@@ -30,6 +30,7 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.jolt.util.TransformUtils;
@@ -41,6 +42,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import java.io.InputStream;
@@ -55,11 +57,28 @@ import java.util.stream.Stream;
@Tags({"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr",
"cardinality", "sort"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttribute(attribute = "mime.type", description = "Always set to
application/json")
-@CapabilityDescription("Applies a list of Jolt specifications to the flowfile
JSON payload. A new FlowFile is created "
- + "with transformed content and is routed to the 'success'
relationship. If the JSON transform "
- + "fails, the original FlowFile is routed to the 'failure'
relationship.")
+@CapabilityDescription("Applies a list of Jolt specifications to either the
FlowFile JSON content or a specified FlowFile JSON attribute. "
+ + "If the JSON transform fails, the original FlowFile is routed to the
'failure' relationship.")
@RequiresInstanceClassLoading
public class JoltTransformJSON extends AbstractJoltTransform {
+
+ public static final PropertyDescriptor JSON_SOURCE = new
PropertyDescriptor.Builder()
+ .name("JSON Source")
+ .description("Specifies whether the Jolt transformation is applied
to FlowFile JSON content or to specified FlowFile JSON attribute.")
+ .required(true)
+ .allowableValues(JsonSourceStrategy.class)
+ .defaultValue(JsonSourceStrategy.FLOW_FILE)
+ .build();
+
+ public static final PropertyDescriptor JSON_SOURCE_ATTRIBUTE = new
PropertyDescriptor.Builder()
+ .name("JSON Source Attribute")
+ .description("The FlowFile attribute containing JSON to be
transformed.")
+ .dependsOn(JSON_SOURCE, JsonSourceStrategy.ATTRIBUTE)
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+ .build();
+
public static final PropertyDescriptor PRETTY_PRINT = new
PropertyDescriptor.Builder()
.name("Pretty Print")
.displayName("Pretty Print")
@@ -80,16 +99,18 @@ public class JoltTransformJSON extends
AbstractJoltTransform {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
- .description("The FlowFile with transformed content will be routed
to this relationship")
+ .description("The FlowFile with successfully transformed content
or updated attribute will be routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
- .description("If a FlowFile fails processing for any reason (for
example, the FlowFile is not valid JSON), it will be routed to this
relationship")
+ .description("If the JSON transformation fails (e.g., due to
invalid JSON in the content or attribute), the original FlowFile is routed to
this relationship.")
.build();
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
Stream.concat(
getCommonPropertyDescriptors().stream(),
Stream.of(
+ JSON_SOURCE,
+ JSON_SOURCE_ATTRIBUTE,
PRETTY_PRINT,
MAX_STRING_LENGTH
)
@@ -122,14 +143,34 @@ public class JoltTransformJSON extends
AbstractJoltTransform {
final ComponentLog logger = getLogger();
final StopWatch stopWatch = new StopWatch(true);
-
final Object inputJson;
- try (final InputStream in = session.read(original)) {
- inputJson = jsonUtil.jsonToObject(in);
- } catch (final Exception e) {
- logger.error("JSON parsing failed for {}", original, e);
- session.transfer(original, REL_FAILURE);
- return;
+ final boolean sourceStrategyFlowFile = JsonSourceStrategy.FLOW_FILE ==
context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class);
+ String jsonSourceAttributeName = null;
+
+ if (sourceStrategyFlowFile) {
+ try (final InputStream in = session.read(original)) {
+ inputJson = jsonUtil.jsonToObject(in);
+ } catch (final Exception e) {
+ logger.error("JSON parsing failed on FlowFile content for {}",
original, e);
+ session.transfer(original, REL_FAILURE);
+ return;
+ }
+ } else {
+ jsonSourceAttributeName =
context.getProperty(JSON_SOURCE_ATTRIBUTE).getValue();
+ final String jsonSourceAttributeValue =
original.getAttribute(jsonSourceAttributeName);
+ if (StringUtils.isBlank(jsonSourceAttributeValue)) {
+ logger.error("FlowFile attribute '{}' value is blank",
jsonSourceAttributeName);
+ session.transfer(original, REL_FAILURE);
+ return;
+ } else {
+ try {
+ inputJson =
jsonUtil.jsonToObject(jsonSourceAttributeValue);
+ } catch (final Exception e) {
+ logger.error("JSON parsing failed on attribute '{}' of
FlowFile {}", jsonSourceAttributeName, original, e);
+ session.transfer(original, REL_FAILURE);
+ return;
+ }
+ }
}
final String jsonString;
@@ -152,13 +193,18 @@ public class JoltTransformJSON extends
AbstractJoltTransform {
}
}
- FlowFile transformed = session.write(original, out ->
out.write(jsonString.getBytes(StandardCharsets.UTF_8)));
-
- final String transformType =
context.getProperty(JOLT_TRANSFORM).getValue();
- transformed = session.putAttribute(transformed,
CoreAttributes.MIME_TYPE.key(), "application/json");
- session.transfer(transformed, REL_SUCCESS);
- session.getProvenanceReporter().modifyContent(transformed, "Modified
With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- logger.info("Transform completed for {}", original);
+ if (sourceStrategyFlowFile) {
+ FlowFile transformed = session.write(original, out ->
out.write(jsonString.getBytes(StandardCharsets.UTF_8)));
+ final String transformType =
context.getProperty(JOLT_TRANSFORM).getValue();
+ transformed = session.putAttribute(transformed,
CoreAttributes.MIME_TYPE.key(), "application/json");
+ session.transfer(transformed, REL_SUCCESS);
+ session.getProvenanceReporter().modifyContent(transformed,
"Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ logger.info("Transform completed on FlowFile content for {}",
original);
+ } else {
+ session.putAttribute(original, jsonSourceAttributeName,
jsonString);
+ session.transfer(original, REL_SUCCESS);
+ logger.info("Transform completed on attribute '{}' of FlowFile
{}", jsonSourceAttributeName, original);
+ }
}
@OnScheduled
diff --git
a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JsonSourceStrategy.java
b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JsonSourceStrategy.java
new file mode 100644
index 0000000000..898142c4f7
--- /dev/null
+++
b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JsonSourceStrategy.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.jolt;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum JsonSourceStrategy implements DescribedValue {
+ FLOW_FILE("Transformation applied to FlowFile content containing JSON"),
+ ATTRIBUTE("Transformation applied to FlowFile attribute containing JSON");
+
+ private final String description;
+
+ JsonSourceStrategy(final String description) {
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return name();
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java
b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java
index 593e4dc518..9912cf5e61 100644
---
a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java
+++
b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java
@@ -51,6 +51,11 @@ class TestJoltTransformJSON {
final static Path JSON_INPUT =
Paths.get("src/test/resources/TestJoltTransformJson/input.json");
final static Diffy DIFFY = new Diffy();
final static String CHAINR_SPEC_PATH =
"src/test/resources/specs/chainrSpec.json";
+ final static String SHIFTR_SPEC_PATH =
"src/test/resources/specs/shiftrSpec.json";
+ final static String SHIFTR_JSON_OUTPUT = "shiftrOutput.json";
+ final static String CHAINR_JSON_OUTPUT = "chainrOutput.json";
+ private static final String JSON_SOURCE_ATTR_NAME = "jsonSourceAttr";
+
static String chainrSpecContents;
private Processor processor;
private TestRunner runner;
@@ -199,36 +204,35 @@ class TestJoltTransformJSON {
@ParameterizedTest(name = "{index} {1}")
@MethodSource("getChainrArguments")
- /*NOTE: Even though description is not used in the actual test, it needs
to be declared in order to use it in the ParameterizedTest name argument*/
+ /*NOTE: Even though description is not used in the actual test, it
needs to be declared in order to use it in the ParameterizedTest name argument*/
void testTransformInputWithChainr(Path specPath, String
ignoredDescription) throws IOException {
final String spec = Files.readString(specPath);
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.enqueue(JSON_INPUT);
runner.run();
- assertTransformedEquals("chainrOutput.json");
+ assertTransformedEquals(CHAINR_JSON_OUTPUT);
}
@Test
void testTransformInputWithShiftr() throws IOException {
- final String spec =
Files.readString(Paths.get("src/test/resources/specs/shiftrSpec.json"));
+ final String spec = Files.readString(Paths.get(SHIFTR_SPEC_PATH));
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,
JoltTransformStrategy.SHIFTR);
runner.enqueue(JSON_INPUT);
runner.run();
- assertTransformedEquals("shiftrOutput.json");
+ assertTransformedEquals(SHIFTR_JSON_OUTPUT);
}
@Test
void testTransformInputWithShiftrFromFile() throws IOException {
- final String spec = "./src/test/resources/specs/shiftrSpec.json";
- runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC, SHIFTR_SPEC_PATH);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,
JoltTransformStrategy.SHIFTR);
runner.enqueue(JSON_INPUT);
runner.run();
- assertTransformedEquals("shiftrOutput.json");
+ assertTransformedEquals(SHIFTR_JSON_OUTPUT);
}
@Test
@@ -243,7 +247,7 @@ class TestJoltTransformJSON {
runner.enqueue(JSON_INPUT, attributes);
runner.run();
- assertTransformedEquals("shiftrOutput.json");
+ assertTransformedEquals(SHIFTR_JSON_OUTPUT);
}
String addAccentedChars(String input) {
@@ -252,13 +256,13 @@ class TestJoltTransformJSON {
@Test
void testTransformInputWithShiftrAccentedChars() throws IOException {
- final String spec =
addAccentedChars(Files.readString(Paths.get("src/test/resources/specs/shiftrSpec.json")));
+ final String spec =
addAccentedChars(Files.readString(Paths.get(SHIFTR_SPEC_PATH)));
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,
JoltTransformStrategy.SHIFTR);
runner.enqueue(addAccentedChars(Files.readString(JSON_INPUT)));
runner.run();
- assertTransformedEquals("shiftrOutput.json");
+ assertTransformedEquals(SHIFTR_JSON_OUTPUT);
}
@Test
@@ -382,7 +386,7 @@ class TestJoltTransformJSON {
runner.enqueue(JSON_INPUT);
runner.run();
- assertTransformedEquals("chainrOutput.json");
+ assertTransformedEquals(CHAINR_JSON_OUTPUT);
}
@Test
@@ -401,7 +405,7 @@ class TestJoltTransformJSON {
runner.enqueue(JSON_INPUT, customSpecs);
runner.run();
- assertTransformedEquals("chainrOutput.json");
+ assertTransformedEquals(CHAINR_JSON_OUTPUT);
}
@Test
@@ -414,7 +418,7 @@ class TestJoltTransformJSON {
runner.enqueue(JSON_INPUT);
runner.run();
- assertTransformedEquals("chainrOutput.json");
+ assertTransformedEquals(CHAINR_JSON_OUTPUT);
}
@Test
@@ -426,7 +430,7 @@ class TestJoltTransformJSON {
runner.enqueue(JSON_INPUT);
runner.run();
- assertTransformedEquals("chainrOutput.json");
+ assertTransformedEquals(CHAINR_JSON_OUTPUT);
}
@Test
@@ -464,6 +468,55 @@ class TestJoltTransformJSON {
runner.assertNotValid();
}
+ private static Stream<Arguments> provideJsonSourceAttributeArguments() {
+ String INVALID_INPUT_JSON =
"{\"rating\":{\"primary\":{\"value\":3},\"series\":{\"value\":[5,4]},\"quality\":{\"value\":}}}";
+ String EXPECTED_JSON =
"{\"rating\":{\"primary\":{\"value\":3},\"series\":{\"value\":[5,4]},\"quality\":{\"value\":3}}}";
+
+ return Stream.of(
+ Arguments.argumentSet("testJsonAttributeNotInitialised",
JSON_SOURCE_ATTR_NAME, null,
+ SHIFTR_SPEC_PATH, JoltTransformStrategy.SHIFTR, false,
null),
+ Arguments.argumentSet("testInvalidJsonAttribute",
JSON_SOURCE_ATTR_NAME, Map.of(JSON_SOURCE_ATTR_NAME, INVALID_INPUT_JSON),
+ SHIFTR_SPEC_PATH, JoltTransformStrategy.SHIFTR, false,
null),
+ Arguments.argumentSet("testValidJsonAttribute",
JSON_SOURCE_ATTR_NAME, Map.of(JSON_SOURCE_ATTR_NAME, EXPECTED_JSON),
+ CHAINR_SPEC_PATH, JoltTransformStrategy.CHAINR, true,
CHAINR_JSON_OUTPUT)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideJsonSourceAttributeArguments")
+ void testJsonSourceAttribute(String jsonSourceAttribute,
+ Map<String, String> flowFileAttributes,
+ String joltSpec,
+ JoltTransformStrategy joltStrategy,
+ boolean expectSuccess,
+ String expectedOutputFile) throws IOException
{
+ runner.setProperty(JoltTransformJSON.JOLT_SPEC, joltSpec);
+ runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, joltStrategy);
+ runner.setProperty(JoltTransformJSON.JSON_SOURCE,
JsonSourceStrategy.ATTRIBUTE);
+ runner.setProperty(JoltTransformJSON.JSON_SOURCE_ATTRIBUTE,
jsonSourceAttribute);
+ runner.enqueue(JSON_INPUT, flowFileAttributes != null ?
flowFileAttributes : Collections.emptyMap());
+ runner.run();
+
+ if (expectSuccess) {
+ assertTransformedJsonAttributeEquals(expectedOutputFile);
+ } else {
+
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_FAILURE);
+ }
+ }
+
+ private void assertTransformedJsonAttributeEquals(final String
expectedOutputContent) throws IOException {
+ runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
+
+ final MockFlowFile transformed =
runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).getFirst();
+ transformed.assertAttributeExists(JSON_SOURCE_ATTR_NAME);
+
+ final Object transformedJson =
JsonUtils.jsonToObject(transformed.getAttribute(JSON_SOURCE_ATTR_NAME));
+
+ final String compareOutputPath =
"src/test/resources/TestJoltTransformJson/%s".formatted(expectedOutputContent);
+ final Object compareJson =
JsonUtils.jsonToObject(Files.newInputStream(Paths.get(compareOutputPath)));
+ assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
+ }
+
private void assertTransformedEquals(final String expectedOutputFilename)
throws IOException {
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);