nifi-476: Addressing the handling of null values in JsonPath related processors and providing configuration to treat them as empty string or the string "null".
Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/fb2206a9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/fb2206a9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/fb2206a9 Branch: refs/heads/NIFI-271 Commit: fb2206a9bb5653196e5dc673f7b71ca0f8bc87b9 Parents: c5b961b Author: Aldrin Piri <[email protected]> Authored: Fri Apr 17 23:30:26 2015 -0400 Committer: Aldrin Piri <[email protected]> Committed: Fri Apr 17 23:30:26 2015 -0400 ---------------------------------------------------------------------- .../standard/AbstractJsonPathProcessor.java | 25 +++++- .../processors/standard/EvaluateJsonPath.java | 36 ++++---- .../nifi/processors/standard/SplitJson.java | 5 +- .../standard/TestEvaluateJsonPath.java | 86 +++++++++++++++++++ .../nifi/processors/standard/TestSplitJson.java | 88 ++++++++++++++++++++ 5 files changed, 217 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb2206a9/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java index 94a299e..a80ee0f 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java @@ -22,6 +22,7 @@ import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.internal.spi.json.JsonSmartJsonProvider; import com.jayway.jsonpath.spi.json.JsonProvider; import net.minidev.json.parser.JSONParser; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; @@ -35,8 +36,10 @@ import org.apache.nifi.util.ObjectHolder; import java.io.IOException; import java.io.InputStream; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** * Provides common functionality used for processors interacting and manipulating JSON data via JsonPath. @@ -51,6 +54,24 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor { private static final JsonProvider JSON_PROVIDER = STRICT_PROVIDER_CONFIGURATION.jsonProvider(); + static final Map<String, String> NULL_REPRESENTATION_MAP = new HashMap<>(); + + static final String EMPTY_STRING_OPTION = "empty string"; + static final String NULL_STRING_OPTION = "the string 'null'"; + + static { + NULL_REPRESENTATION_MAP.put(EMPTY_STRING_OPTION, ""); + NULL_REPRESENTATION_MAP.put(NULL_STRING_OPTION, "null"); + } + + public static final PropertyDescriptor NULL_VALUE_DEFAULT_REPRESENTATION = new PropertyDescriptor.Builder() + .name("Null Value Representation") + .description("Indicates the desired representation of JSON Path expressions resulting in a null value.") + .required(true) + .allowableValues(NULL_REPRESENTATION_MAP.keySet()) + .defaultValue(EMPTY_STRING_OPTION) + .build(); + static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) { // Parse the document once into an associated context to support multiple path evaluations if specified final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null); @@ -79,9 +100,9 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor { return !(obj instanceof Map || obj instanceof List); } - static String getResultRepresentation(Object jsonPathResult) { + static String getResultRepresentation(Object jsonPathResult, String defaultValue) { if (isJsonScalar(jsonPathResult)) { - return jsonPathResult.toString(); + return Objects.toString(jsonPathResult, defaultValue); } return JSON_PROVIDER.toJson(jsonPathResult); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb2206a9/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java index 229b342..f214178 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java @@ -16,20 +16,10 @@ */ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.InvalidJsonException; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.PathNotFoundException; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; @@ -52,10 +42,12 @@ import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.util.ObjectHolder; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.InvalidJsonException; -import com.jayway.jsonpath.JsonPath; -import com.jayway.jsonpath.PathNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; @EventDriven @SideEffectFree @@ -72,7 +64,7 @@ import com.jayway.jsonpath.PathNotFoundException; + "If Destination is 'flowfile-content' and the JsonPath does not evaluate to a defined path, the FlowFile will be routed to 'unmatched' without having its contents modified. " + "If Destination is flowfile-attribute and the expression matches nothing, attributes will be created with " + "empty strings as the value, and the FlowFile will always be routed to 'matched.'") -@DynamicProperty(name="A FlowFile attribute(if <Destination> is set to 'flowfile-attribute')", value="A JsonPath expression", description="If <Destination>='flowfile-attribute' then that FlowFile attribute " + +@DynamicProperty(name = "A FlowFile attribute(if <Destination> is set to 'flowfile-attribute')", value = "A JsonPath expression", description = "If <Destination>='flowfile-attribute' then that FlowFile attribute " + "will be set to any JSON objects that match the JsonPath. If <Destination>='flowfile-content' then the FlowFile content will be updated to any JSON objects that match the JsonPath.") public class EvaluateJsonPath extends AbstractJsonPathProcessor { @@ -119,6 +111,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { final List<PropertyDescriptor> properties = new ArrayList<>(); properties.add(DESTINATION); properties.add(RETURN_TYPE); + properties.add(NULL_VALUE_DEFAULT_REPRESENTATION); this.properties = Collections.unmodifiableList(properties); } @@ -211,6 +204,9 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { final ProcessorLog logger = getLogger(); + String representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue(); + final String nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption); + /* Build the JsonPath expressions from attributes */ final Map<String, JsonPath> attributeToJsonPathMap = new HashMap<>(); @@ -265,7 +261,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor { } } - final String resultRepresentation = getResultRepresentation(resultHolder.get()); + final String resultRepresentation = getResultRepresentation(resultHolder.get(), nullDefaultValue); switch (destination) { case DESTINATION_ATTRIBUTE: jsonPathResults.put(jsonPathAttrKey, resultRepresentation); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb2206a9/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java index 4d79746..bba770a 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java @@ -74,6 +74,7 @@ public class SplitJson extends AbstractJsonPathProcessor { protected void init(final ProcessorInitializationContext context) { final List<PropertyDescriptor> properties = new ArrayList<>(); properties.add(ARRAY_JSON_PATH_EXPRESSION); + properties.add(NULL_VALUE_DEFAULT_REPRESENTATION); this.properties = Collections.unmodifiableList(properties); final Set<Relationship> relationships = new HashSet<>(); @@ -142,6 +143,8 @@ public class SplitJson extends AbstractJsonPathProcessor { } final JsonPath jsonPath = JSON_PATH_REF.get(); + String representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue(); + final String nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption); final List<FlowFile> segments = new ArrayList<>(); @@ -168,7 +171,7 @@ public class SplitJson extends AbstractJsonPathProcessor { split = processSession.write(split, new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { - String resultSegmentContent = getResultRepresentation(resultSegment); + String resultSegmentContent = getResultRepresentation(resultSegment, nullDefaultValue); out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8)); } }); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb2206a9/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java index 058e21c..5fa8f4d 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEvaluateJsonPath.java @@ -16,7 +16,11 @@ */ package org.apache.nifi.processors.standard; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.TestRunner; @@ -24,9 +28,14 @@ import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; +import static org.junit.Assert.assertEquals; + public class TestEvaluateJsonPath { private static final Path JSON_SNIPPET = Paths.get("src/test/resources/TestJson/json-sample.json"); @@ -261,4 +270,81 @@ public class TestEvaluateJsonPath { testRunner.getFlowFilesForRelationship(expectedRel).get(0).assertContentEquals(JSON_SNIPPET); } + @Test + public void testNullInput() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath()); + testRunner.setProperty(EvaluateJsonPath.RETURN_TYPE, EvaluateJsonPath.RETURN_TYPE_JSON); + testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE); + testRunner.setProperty("stringField", "$.stringField"); + testRunner.setProperty("missingField", "$.missingField"); + testRunner.setProperty("nullField", "$.nullField"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + ff = session.write(ff, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + try (OutputStream outputStream = new BufferedOutputStream(out)) { + outputStream.write("{\"stringField\": \"String Value\", \"nullField\": null}".getBytes(StandardCharsets.UTF_8)); + } + } + }); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.assertTransferCount(EvaluateJsonPath.REL_MATCH, 1); + + FlowFile output = testRunner.getFlowFilesForRelationship(EvaluateJsonPath.REL_MATCH).get(0); + + String validFieldValue = output.getAttribute("stringField"); + assertEquals("String Value", validFieldValue); + + String missingValue = output.getAttribute("missingField"); + assertEquals("Missing Value", "", missingValue); + + String nullValue = output.getAttribute("nullField"); + assertEquals("Null Value", "", nullValue); + } + + @Test + public void testNullInput_nullStringRepresentation() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath()); + testRunner.setProperty(EvaluateJsonPath.RETURN_TYPE, EvaluateJsonPath.RETURN_TYPE_JSON); + testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE); + testRunner.setProperty(EvaluateJsonPath.NULL_VALUE_DEFAULT_REPRESENTATION, AbstractJsonPathProcessor.NULL_STRING_OPTION); + testRunner.setProperty("stringField", "$.stringField"); + testRunner.setProperty("missingField", "$.missingField"); + testRunner.setProperty("nullField", "$.nullField"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + ff = session.write(ff, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + try (OutputStream outputStream = new BufferedOutputStream(out)) { + outputStream.write("{\"stringField\": \"String Value\", \"nullField\": null}".getBytes(StandardCharsets.UTF_8)); + } + } + }); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.assertTransferCount(EvaluateJsonPath.REL_MATCH, 1); + + FlowFile output = testRunner.getFlowFilesForRelationship(EvaluateJsonPath.REL_MATCH).get(0); + + String validFieldValue = output.getAttribute("stringField"); + assertEquals("String Value", validFieldValue); + + String missingValue = output.getAttribute("missingField"); + assertEquals("Missing Value", "", missingValue); + + String nullValue = output.getAttribute("nullField"); + assertEquals("Null Value", "null", nullValue); + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb2206a9/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java index f47467f..fc07386 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java @@ -16,13 +16,20 @@ */ package org.apache.nifi.processors.standard; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; @@ -123,4 +130,85 @@ public class TestSplitJson { testRunner.assertTransferCount(SplitJson.REL_FAILURE, 1); testRunner.getFlowFilesForRelationship(SplitJson.REL_FAILURE).get(0).assertContentEquals(JSON_SNIPPET); } + + @Test + public void testSplit_pathToNullValue() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson()); + testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$.nullField"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + ff = session.write(ff, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + try (OutputStream outputStream = new BufferedOutputStream(out)) { + outputStream.write("{\"stringField\": \"String Value\", \"nullField\": null}".getBytes(StandardCharsets.UTF_8)); + } + } + }); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.assertTransferCount(SplitJson.REL_FAILURE, 1); + } + + @Test + public void testSplit_pathToArrayWithNulls_emptyStringRepresentation() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson()); + testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$.arrayOfNulls"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + ff = session.write(ff, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + try (OutputStream outputStream = new BufferedOutputStream(out)) { + outputStream.write("{\"stringField\": \"String Value\", \"arrayOfNulls\": [null, null, null]}".getBytes(StandardCharsets.UTF_8)); + } + } + }); + + testRunner.enqueue(ff); + testRunner.run(); + + /* assert that three files were transferred to split and each is empty */ + int expectedFiles = 3; + testRunner.assertTransferCount(SplitJson.REL_SPLIT, expectedFiles); + for (int i = 0; i < expectedFiles; i++) { + testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(i).assertContentEquals(""); + } + } + + @Test + public void testSplit_pathToArrayWithNulls_nullStringRepresentation() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson()); + testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$.arrayOfNulls"); + testRunner.setProperty(SplitJson.NULL_VALUE_DEFAULT_REPRESENTATION, + AbstractJsonPathProcessor.NULL_STRING_OPTION); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + ff = session.write(ff, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + try (OutputStream outputStream = new BufferedOutputStream(out)) { + outputStream.write("{\"stringField\": \"String Value\", \"arrayOfNulls\": [null, null, null]}".getBytes(StandardCharsets.UTF_8)); + } + } + }); + + testRunner.enqueue(ff); + testRunner.run(); + + /* assert that three files were transferred to split and each has the word null in it */ + int expectedFiles = 3; + testRunner.assertTransferCount(SplitJson.REL_SPLIT, expectedFiles); + for (int i = 0; i < expectedFiles; i++) { + testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(i).assertContentEquals("null"); + } + } }
