This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 850959dd17 NIFI-12127 Allow Jackson's max string length to be
configured on SplitJson and EvaluateJsonPath
850959dd17 is described below
commit 850959dd17b275cbaf1191bbdcbcfb4e0f6e9355
Author: Bryan Bende <[email protected]>
AuthorDate: Tue Sep 26 09:46:22 2023 -0400
NIFI-12127 Allow Jackson's max string length to be configured on SplitJson
and EvaluateJsonPath
This closes #7794
Signed-off-by: Mike Thomsen <[email protected]>
---
.../standard/AbstractJsonPathProcessor.java | 51 +++++++++++++++-------
.../nifi/processors/standard/EvaluateJsonPath.java | 11 ++++-
.../apache/nifi/processors/standard/SplitJson.java | 43 ++++++++++--------
3 files changed, 69 insertions(+), 36 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
index 81c7aa8ec6..aa72654b99 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
@@ -16,20 +16,14 @@
*/
package org.apache.nifi.processors.standard;
+import com.fasterxml.jackson.core.StreamReadConstraints;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
import com.jayway.jsonpath.spi.json.JsonProvider;
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -38,8 +32,18 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* Provides common functionality used for processors interacting and
manipulating JSON data via JsonPath.
*
@@ -49,10 +53,6 @@ import org.apache.nifi.util.StringUtils;
*/
public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
- private static final Configuration STRICT_PROVIDER_CONFIGURATION =
Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();
-
- private static final JsonProvider JSON_PROVIDER =
STRICT_PROVIDER_CONFIGURATION.jsonProvider();
-
static final Map<String, String> NULL_REPRESENTATION_MAP = new HashMap<>();
static final String EMPTY_STRING_OPTION = "empty string";
@@ -71,14 +71,33 @@ public abstract class AbstractJsonPathProcessor extends
AbstractProcessor {
.defaultValue(EMPTY_STRING_OPTION)
.build();
- static DocumentContext validateAndEstablishJsonContext(ProcessSession
processSession, FlowFile flowFile) {
+ public static final PropertyDescriptor MAX_STRING_LENGTH = new
PropertyDescriptor.Builder()
+ .name("max-string-length")
+ .displayName("Max String Length")
+ .description("The maximum allowed length of a string value when
parsing the JSON document")
+ .required(true)
+ .defaultValue("20 MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ static Configuration createConfiguration(final int maxStringLength) {
+ final StreamReadConstraints streamReadConstraints =
StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
+
+ final ObjectMapper objectMapper = new ObjectMapper();
+
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
+
+ final JsonProvider jsonProvider = new
JacksonJsonProvider(objectMapper);
+ return Configuration.builder().jsonProvider(jsonProvider).build();
+ }
+
+ static DocumentContext validateAndEstablishJsonContext(ProcessSession
processSession, FlowFile flowFile, Configuration jsonPathConfiguration) {
// Parse the document once into an associated context to support
multiple path evaluations if specified
final AtomicReference<DocumentContext> contextHolder = new
AtomicReference<>(null);
processSession.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try (BufferedInputStream bufferedInputStream = new
BufferedInputStream(in)) {
- DocumentContext ctx =
JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(bufferedInputStream);
+ DocumentContext ctx =
JsonPath.using(jsonPathConfiguration).parse(bufferedInputStream);
contextHolder.set(ctx);
} catch (IllegalArgumentException iae) {
// The JsonPath.parse() above first parses the json, then
creates a context object from the parsed
@@ -109,11 +128,11 @@ public abstract class AbstractJsonPathProcessor extends
AbstractProcessor {
return !(obj instanceof Map || obj instanceof List);
}
- static String getResultRepresentation(Object jsonPathResult, String
defaultValue) {
+ static String getResultRepresentation(JsonProvider jsonProvider, Object
jsonPathResult, String defaultValue) {
if (isJsonScalar(jsonPathResult)) {
return Objects.toString(jsonPathResult, defaultValue);
}
- return JSON_PROVIDER.toJson(jsonPathResult);
+ return jsonProvider.toJson(jsonPathResult);
}
abstract static class JsonPathValidator implements Validator {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
index 7075e0c85f..b72a8718db 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.standard;
+import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.JsonPath;
@@ -38,6 +39,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
@@ -149,6 +151,7 @@ public class EvaluateJsonPath extends
AbstractJsonPathProcessor {
private volatile String returnType;
private volatile String pathNotFound;
private volatile String nullDefaultValue;
+ private volatile Configuration jsonPathConfiguration;
@Override
protected void init(final ProcessorInitializationContext context) {
@@ -163,6 +166,7 @@ public class EvaluateJsonPath extends
AbstractJsonPathProcessor {
props.add(RETURN_TYPE);
props.add(PATH_NOT_FOUND);
props.add(NULL_VALUE_DEFAULT_REPRESENTATION);
+ props.add(MAX_STRING_LENGTH);
this.properties = Collections.unmodifiableList(props);
}
@@ -250,6 +254,9 @@ public class EvaluateJsonPath extends
AbstractJsonPathProcessor {
}
pathNotFound = processContext.getProperty(PATH_NOT_FOUND).getValue();
nullDefaultValue =
NULL_REPRESENTATION_MAP.get(processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue());
+
+ final int maxStringLength =
processContext.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
+ jsonPathConfiguration = createConfiguration(maxStringLength);
}
@OnUnscheduled
@@ -268,7 +275,7 @@ public class EvaluateJsonPath extends
AbstractJsonPathProcessor {
DocumentContext documentContext;
try {
- documentContext = validateAndEstablishJsonContext(processSession,
flowFile);
+ documentContext = validateAndEstablishJsonContext(processSession,
flowFile, jsonPathConfiguration);
} catch (InvalidJsonException e) {
logger.error("FlowFile {} did not have valid JSON content.",
flowFile);
processSession.transfer(flowFile, REL_FAILURE);
@@ -318,7 +325,7 @@ public class EvaluateJsonPath extends
AbstractJsonPathProcessor {
}
}
- final String resultRepresentation =
getResultRepresentation(result, nullDefaultValue);
+ final String resultRepresentation =
getResultRepresentation(jsonPathConfiguration.jsonProvider(), result,
nullDefaultValue);
if (destinationIsAttribute) {
jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
} else {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
index 21166f31bb..e2b1f1b11a 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
@@ -16,18 +16,11 @@
*/
package org.apache.nifi.processors.standard;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.InvalidJsonException;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.PathNotFoundException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
@@ -36,6 +29,7 @@ import
org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -47,16 +41,24 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
-import com.jayway.jsonpath.DocumentContext;
-import com.jayway.jsonpath.InvalidJsonException;
-import com.jayway.jsonpath.JsonPath;
-import com.jayway.jsonpath.PathNotFoundException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
import static
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
@@ -114,12 +116,14 @@ public class SplitJson extends AbstractJsonPathProcessor {
private final AtomicReference<JsonPath> JSON_PATH_REF = new
AtomicReference<>();
private volatile String nullDefaultValue;
+ private volatile Configuration jsonPathConfiguration;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ARRAY_JSON_PATH_EXPRESSION);
properties.add(NULL_VALUE_DEFAULT_REPRESENTATION);
+ properties.add(MAX_STRING_LENGTH);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
@@ -170,6 +174,9 @@ public class SplitJson extends AbstractJsonPathProcessor {
@OnScheduled
public void onScheduled(ProcessContext processContext) {
nullDefaultValue =
NULL_REPRESENTATION_MAP.get(processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue());
+
+ final int maxStringLength =
processContext.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
+ jsonPathConfiguration = createConfiguration(maxStringLength);
}
@Override
@@ -183,7 +190,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
DocumentContext documentContext;
try {
- documentContext = validateAndEstablishJsonContext(processSession,
original);
+ documentContext = validateAndEstablishJsonContext(processSession,
original, jsonPathConfiguration);
} catch (InvalidJsonException e) {
logger.error("FlowFile {} did not have valid JSON content.", new
Object[]{original});
processSession.transfer(original, REL_FAILURE);
@@ -218,7 +225,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
Object resultSegment = resultList.get(i);
FlowFile split = processSession.create(original);
split = processSession.write(split, (out) -> {
- String resultSegmentContent =
getResultRepresentation(resultSegment, nullDefaultValue);
+ String resultSegmentContent =
getResultRepresentation(jsonPathConfiguration.jsonProvider(), resultSegment,
nullDefaultValue);
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
}
);