This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8351e4c97ed NIFI-15716 Added JSON Lines support to ValidateJson
(#11012)
8351e4c97ed is described below
commit 8351e4c97ed227676158b5d64a68dc29ea26034e
Author: dan-s1 <[email protected]>
AuthorDate: Mon Mar 30 16:45:02 2026 -0400
NIFI-15716 Added JSON Lines support to ValidateJson (#11012)
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/processors/standard/ValidateJson.java | 141 +++++++++++++++++----
.../nifi/processors/standard/TestValidateJson.java | 43 ++++++-
2 files changed, 153 insertions(+), 31 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java
index 394904a4be1..2dd5225b1b9 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java
@@ -62,6 +62,9 @@ import
org.apache.nifi.schemaregistry.services.JsonSchemaRegistry;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -79,9 +82,10 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = ValidateJson.ERROR_ATTRIBUTE_KEY, description
= "If the flow file is routed to the invalid relationship "
+ ", this attribute will contain the error message resulting from
the validation failure.")
})
-@CapabilityDescription("Validates the contents of FlowFiles against a
configurable JSON Schema. See json-schema.org for specification standards. " +
- "This Processor does not support input containing multiple JSON
objects, such as newline-delimited JSON. If the input FlowFile contains " +
- "newline-delimited JSON, only the first line will be validated."
+@CapabilityDescription("""
+ Validates the contents of FlowFiles against a configurable JSON
Schema. See json-schema.org for specification standards.
+ This Processor supports input containing multiple JSON objects using
newline-delimited JSON based on configuration properties,
+ otherwise if the input FlowFile contains newline-delimited JSON, only
the first line will be validated."""
)
@SystemResourceConsideration(resource = SystemResource.MEMORY, description =
"Validating JSON requires reading FlowFile content into memory")
@Restricted(
@@ -125,7 +129,35 @@ public class ValidateJson extends AbstractProcessor {
}
}
- protected static final String ERROR_ATTRIBUTE_KEY =
"json.validation.errors";
+ enum InputFormat implements DescribedValue {
+ FLOW_FILE("FlowFile", "Validation applied to FlowFile content
containing JSON"),
+ JSON_LINES("JSON Lines", "Validation applied to FlowFile content
containing JSON Lines or NDJSON");
+
+ private final String displayName;
+ private final String description;
+
+ InputFormat(final String displayName, final String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+ }
+
+ static final String ERROR_ATTRIBUTE_KEY = "json.validation.errors";
private static final String SCHEMA_NAME_PROPERTY_NAME = "Schema Name";
private static final String SCHEMA_CONTENT_PROPERTY_NAME = "JSON Schema";
private static final String DEFAULT_MAX_STRING_LENGTH = "20 MB";
@@ -165,6 +197,14 @@ public class ValidateJson extends AbstractProcessor {
.dependsOn(SCHEMA_ACCESS_STRATEGY,
JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY)
.build();
+ public static final PropertyDescriptor INPUT_FORMAT = new
PropertyDescriptor.Builder()
+ .name("Input Format")
+ .description("Specifies the expected format of FlowFile content
containing one or more JSON elements")
+ .allowableValues(InputFormat.class)
+ .defaultValue(InputFormat.FLOW_FILE)
+ .required(true)
+ .build();
+
public static final PropertyDescriptor MAX_STRING_LENGTH = new
PropertyDescriptor.Builder()
.name("Max String Length")
.description("The maximum allowed length of a string value when
parsing the JSON document")
@@ -184,6 +224,7 @@ public class ValidateJson extends AbstractProcessor {
SCHEMA_REGISTRY,
SCHEMA_CONTENT,
SCHEMA_VERSION,
+ INPUT_FORMAT,
MAX_STRING_LENGTH
);
@@ -298,16 +339,27 @@ public class ValidateJson extends AbstractProcessor {
}
}
+ if (schema == null) {
+ getLogger().error("JSON schema not configured for {}", flowFile);
+ session.getProvenanceReporter().route(flowFile, REL_FAILURE);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ final InputFormat inputFormat =
context.getProperty(INPUT_FORMAT).asAllowableValue(InputFormat.class);
+ if (inputFormat == InputFormat.FLOW_FILE) {
+ validateFlowFile(session, flowFile);
+ } else {
+ validateJsonLines(session, flowFile);
+ }
+ }
+
+ private void validateFlowFile(final ProcessSession session, final FlowFile
flowFile) {
+ final Schema currentSchema = schema;
+
try (final InputStream in = session.read(flowFile)) {
final JsonNode node = mapper.readTree(in);
- final Schema activeSchema = schema;
- if (activeSchema == null) {
- getLogger().error("JSON schema not configured for {}",
flowFile);
- session.getProvenanceReporter().route(flowFile, REL_FAILURE);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
- final List<Error> errors = activeSchema.validate(node);
+ final List<Error> errors = currentSchema.validate(node);
if (errors.isEmpty()) {
getLogger().debug("JSON {} valid", flowFile);
@@ -315,10 +367,10 @@ public class ValidateJson extends AbstractProcessor {
session.transfer(flowFile, REL_VALID);
} else {
final String validationMessages = errors.toString();
- flowFile = session.putAttribute(flowFile, ERROR_ATTRIBUTE_KEY,
validationMessages);
+ final FlowFile invalidJsonFlowFile =
session.putAttribute(flowFile, ERROR_ATTRIBUTE_KEY, validationMessages);
getLogger().warn("JSON {} invalid: Validation Errors {}",
flowFile, validationMessages);
- session.getProvenanceReporter().route(flowFile, REL_INVALID);
- session.transfer(flowFile, REL_INVALID);
+ session.getProvenanceReporter().route(invalidJsonFlowFile,
REL_INVALID);
+ session.transfer(invalidJsonFlowFile, REL_INVALID);
}
} catch (final Exception e) {
getLogger().error("JSON processing failed {}", flowFile, e);
@@ -327,24 +379,57 @@ public class ValidateJson extends AbstractProcessor {
}
}
+ private void validateJsonLines(final ProcessSession session, final
FlowFile flowFile) {
+ final Schema currentSchema = schema;
+
+ try (final InputStream in = session.read(flowFile);
+ final LineNumberReader reader = new LineNumberReader(new
InputStreamReader(in, StandardCharsets.UTF_8))) {
+
+ String line;
+
+ while ((line = reader.readLine()) != null) {
+ if (line.isBlank()) {
+ continue;
+ }
+
+ final JsonNode node = mapper.readTree(line);
+ final List<Error> errors = currentSchema.validate(node);
+
+ if (!errors.isEmpty()) {
+ reader.close(); // NOTE: Must call close otherwise get
IllegalStateException indicating FlowFile already in use
+ // by an active callback or InputStream created by
ProcessSession.read(FlowFile) has not been closed
+ final String validationMessages = errors.toString();
+ final String validationErrMsg = "JSON at line %s is
invalid: %s".formatted(reader.getLineNumber(), validationMessages);
+ final FlowFile invalidJsonFlowFile =
session.putAttribute(flowFile, ERROR_ATTRIBUTE_KEY, validationErrMsg);
+ getLogger().warn("JSON at line {} in {} is invalid:
Validation Errors {}", reader.getLineNumber(), flowFile, validationMessages);
+ session.getProvenanceReporter().route(invalidJsonFlowFile,
REL_INVALID);
+ session.transfer(invalidJsonFlowFile, REL_INVALID);
+ return;
+ }
+ }
+
+ session.getProvenanceReporter().route(flowFile, REL_VALID);
+ session.transfer(flowFile, REL_VALID);
+
+ } catch (final Exception e) {
+ getLogger().error("{} processing failed {}",
InputFormat.JSON_LINES.getDisplayName(), flowFile, e);
+ session.getProvenanceReporter().route(flowFile, REL_FAILURE);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+
private String getPropertyValidateMessage(JsonSchemaStrategy
schemaAccessStrategy, PropertyDescriptor property) {
return "The '" + schemaAccessStrategy.getValue() + "' Schema Access
Strategy requires that the " + property.getDisplayName() + " property be set.";
}
private SpecificationVersion mapToSpecification(final SchemaVersion
schemaVersion) {
- switch (schemaVersion) {
- case DRAFT_4:
- return SpecificationVersion.DRAFT_4;
- case DRAFT_6:
- return SpecificationVersion.DRAFT_6;
- case DRAFT_7:
- return SpecificationVersion.DRAFT_7;
- case DRAFT_2019_09:
- return SpecificationVersion.DRAFT_2019_09;
- case DRAFT_2020_12:
- return SpecificationVersion.DRAFT_2020_12;
- }
- throw new IllegalArgumentException("Unsupported schema version: " +
schemaVersion);
+ return switch (schemaVersion) {
+ case DRAFT_4 -> SpecificationVersion.DRAFT_4;
+ case DRAFT_6 -> SpecificationVersion.DRAFT_6;
+ case DRAFT_7 -> SpecificationVersion.DRAFT_7;
+ case DRAFT_2019_09 -> SpecificationVersion.DRAFT_2019_09;
+ case DRAFT_2020_12 -> SpecificationVersion.DRAFT_2020_12;
+ };
}
private JsonSchemaStrategy getSchemaAccessStrategy(PropertyContext
context) {
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java
index b9d6810f198..4cd772620db 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java
@@ -49,6 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class TestValidateJson {
private static final String JSON = getFileContent("simple-example.json");
@@ -62,7 +63,7 @@ class TestValidateJson {
runner = TestRunners.newTestRunner(ValidateJson.class);
}
- @ParameterizedTest(name = "{2}")
+ @ParameterizedTest
@MethodSource("customValidateArgs")
void testCustomValidateMissingProperty(final
ValidateJson.JsonSchemaStrategy strategy) {
runner.setProperty(ValidateJson.SCHEMA_ACCESS_STRATEGY, strategy);
@@ -252,6 +253,35 @@ class TestValidateJson {
runner.assertTransferCount(ValidateJson.REL_VALID, 1);
}
+ @ParameterizedTest
+ @MethodSource("multilineJsonArgs")
+ void testMultilineJsonWhereSecondLineInvalid(ValidateJson.InputFormat
inputFormat, boolean expectedValid) {
+ final String multilineJson = """
+
{"FieldOne":"stringValue","FieldTwo":1234,"FieldThree":[{"arrayField":"arrayValue"}]}
+
{"FieldOne":"stringValue","FieldTwo":"NAN","FieldThree":[{"arrayField":"arrayValue"}]}
+ """;
+ runner.setProperty(ValidateJson.SCHEMA_CONTENT, SIMPLE_SCHEMA);
+ runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION,
SCHEMA_VERSION);
+ runner.setProperty(ValidateJson.INPUT_FORMAT, inputFormat.getValue());
+ runner.enqueue(multilineJson);
+
+ runner.run();
+
+ runner.assertTransferCount(ValidateJson.REL_FAILURE, 0);
+ if (expectedValid) {
+ runner.assertTransferCount(ValidateJson.REL_INVALID, 0);
+ runner.assertTransferCount(ValidateJson.REL_VALID, 1);
+ } else {
+ runner.assertTransferCount(ValidateJson.REL_INVALID, 1);
+ runner.assertTransferCount(ValidateJson.REL_VALID, 0);
+
+ assertTrue(runner.getLogger().getWarnMessages().stream()
+ .anyMatch(logMessage -> logMessage.getMsg().contains("JSON
at line 2") && logMessage.getMsg().contains("is invalid")));
+ }
+
+ runner.clearTransferState();
+ }
+
private void assertValidationErrors(Relationship relationship, boolean
expected) {
final Map<String, String> attributes =
runner.getFlowFilesForRelationship(relationship).getFirst().getAttributes();
@@ -265,8 +295,15 @@ class TestValidateJson {
private static Stream<Arguments> customValidateArgs() {
return Stream.of(
-
Arguments.of(ValidateJson.JsonSchemaStrategy.SCHEMA_NAME_PROPERTY, "requires
that the JSON Schema Registry property be set"),
-
Arguments.of(ValidateJson.JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY, "requires
that the JSON Schema property be set")
+ Arguments.argumentSet("Require JSON Schema Registry property
to be set", ValidateJson.JsonSchemaStrategy.SCHEMA_NAME_PROPERTY),
+ Arguments.argumentSet("Require JSON Schema property to be
set", ValidateJson.JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY)
+ );
+ }
+
+ private static Stream<Arguments> multilineJsonArgs() {
+ return Stream.of(
+
Arguments.argumentSet(ValidateJson.InputFormat.FLOW_FILE.getDisplayName(),
ValidateJson.InputFormat.FLOW_FILE.getValue(), true),
+
Arguments.argumentSet(ValidateJson.InputFormat.JSON_LINES.getDisplayName(),
ValidateJson.InputFormat.JSON_LINES.getValue(), false)
);
}