This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 8351e4c97ed NIFI-15716 Added JSON Lines support to ValidateJson 
(#11012)
8351e4c97ed is described below

commit 8351e4c97ed227676158b5d64a68dc29ea26034e
Author: dan-s1 <[email protected]>
AuthorDate: Mon Mar 30 16:45:02 2026 -0400

    NIFI-15716 Added JSON Lines support to ValidateJson (#11012)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/processors/standard/ValidateJson.java     | 141 +++++++++++++++++----
 .../nifi/processors/standard/TestValidateJson.java |  43 ++++++-
 2 files changed, 153 insertions(+), 31 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java
index 394904a4be1..2dd5225b1b9 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java
@@ -62,6 +62,9 @@ import 
org.apache.nifi.schemaregistry.services.JsonSchemaRegistry;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -79,9 +82,10 @@ import java.util.stream.Collectors;
     @WritesAttribute(attribute = ValidateJson.ERROR_ATTRIBUTE_KEY, description 
= "If the flow file is routed to the invalid relationship "
             + ", this attribute will contain the error message resulting from 
the validation failure.")
 })
-@CapabilityDescription("Validates the contents of FlowFiles against a 
configurable JSON Schema. See json-schema.org for specification standards. " +
-        "This Processor does not support input containing multiple JSON 
objects, such as newline-delimited JSON. If the input FlowFile contains " +
-        "newline-delimited JSON, only the first line will be validated."
+@CapabilityDescription("""
+        Validates the contents of FlowFiles against a configurable JSON 
Schema. See json-schema.org for specification standards.
+        This Processor supports input containing multiple JSON objects using 
newline-delimited JSON based on configuration properties,
+        otherwise if the input FlowFile contains newline-delimited JSON, only 
the first line will be validated."""
 )
 @SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"Validating JSON requires reading FlowFile content into memory")
 @Restricted(
@@ -125,7 +129,35 @@ public class ValidateJson extends AbstractProcessor {
         }
     }
 
-    protected static final String ERROR_ATTRIBUTE_KEY = 
"json.validation.errors";
+    enum InputFormat implements DescribedValue {
+        FLOW_FILE("FlowFile", "Validation applied to FlowFile content 
containing JSON"),
+        JSON_LINES("JSON Lines", "Validation applied to FlowFile content 
containing JSON Lines or NDJSON");
+
+        private final String displayName;
+        private final String description;
+
+        InputFormat(final String displayName, final String description) {
+            this.displayName = displayName;
+            this.description = description;
+        }
+
+        @Override
+        public String getValue() {
+            return name();
+        }
+
+        @Override
+        public String getDisplayName() {
+            return displayName;
+        }
+
+        @Override
+        public String getDescription() {
+            return description;
+        }
+    }
+
+    static final String ERROR_ATTRIBUTE_KEY = "json.validation.errors";
     private static final String SCHEMA_NAME_PROPERTY_NAME = "Schema Name";
     private static final String SCHEMA_CONTENT_PROPERTY_NAME = "JSON Schema";
     private static final String DEFAULT_MAX_STRING_LENGTH = "20 MB";
@@ -165,6 +197,14 @@ public class ValidateJson extends AbstractProcessor {
             .dependsOn(SCHEMA_ACCESS_STRATEGY, 
JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY)
             .build();
 
+    public static final PropertyDescriptor INPUT_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("Input Format")
+            .description("Specifies the expected format of FlowFile content 
containing one or more JSON elements")
+            .allowableValues(InputFormat.class)
+            .defaultValue(InputFormat.FLOW_FILE)
+            .required(true)
+            .build();
+
     public static final PropertyDescriptor MAX_STRING_LENGTH = new 
PropertyDescriptor.Builder()
             .name("Max String Length")
             .description("The maximum allowed length of a string value when 
parsing the JSON document")
@@ -184,6 +224,7 @@ public class ValidateJson extends AbstractProcessor {
             SCHEMA_REGISTRY,
             SCHEMA_CONTENT,
             SCHEMA_VERSION,
+            INPUT_FORMAT,
             MAX_STRING_LENGTH
     );
 
@@ -298,16 +339,27 @@ public class ValidateJson extends AbstractProcessor {
             }
         }
 
+        if (schema == null) {
+            getLogger().error("JSON schema not configured for {}", flowFile);
+            session.getProvenanceReporter().route(flowFile, REL_FAILURE);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final InputFormat inputFormat = 
context.getProperty(INPUT_FORMAT).asAllowableValue(InputFormat.class);
+        if (inputFormat == InputFormat.FLOW_FILE) {
+            validateFlowFile(session, flowFile);
+        } else {
+            validateJsonLines(session, flowFile);
+        }
+    }
+
+    private void validateFlowFile(final ProcessSession session, final FlowFile 
flowFile) {
+        final Schema currentSchema = schema;
+
         try (final InputStream in = session.read(flowFile)) {
             final JsonNode node = mapper.readTree(in);
-            final Schema activeSchema = schema;
-            if (activeSchema == null) {
-                getLogger().error("JSON schema not configured for {}", 
flowFile);
-                session.getProvenanceReporter().route(flowFile, REL_FAILURE);
-                session.transfer(flowFile, REL_FAILURE);
-                return;
-            }
-            final List<Error> errors = activeSchema.validate(node);
+            final List<Error> errors = currentSchema.validate(node);
 
             if (errors.isEmpty()) {
                 getLogger().debug("JSON {} valid", flowFile);
@@ -315,10 +367,10 @@ public class ValidateJson extends AbstractProcessor {
                 session.transfer(flowFile, REL_VALID);
             } else {
                 final String validationMessages = errors.toString();
-                flowFile = session.putAttribute(flowFile, ERROR_ATTRIBUTE_KEY, 
validationMessages);
+                final FlowFile invalidJsonFlowFile = 
session.putAttribute(flowFile, ERROR_ATTRIBUTE_KEY, validationMessages);
                 getLogger().warn("JSON {} invalid: Validation Errors {}", 
flowFile, validationMessages);
-                session.getProvenanceReporter().route(flowFile, REL_INVALID);
-                session.transfer(flowFile, REL_INVALID);
+                session.getProvenanceReporter().route(invalidJsonFlowFile, 
REL_INVALID);
+                session.transfer(invalidJsonFlowFile, REL_INVALID);
             }
         } catch (final Exception e) {
             getLogger().error("JSON processing failed {}", flowFile, e);
@@ -327,24 +379,57 @@ public class ValidateJson extends AbstractProcessor {
         }
     }
 
+    private void validateJsonLines(final ProcessSession session, final 
FlowFile flowFile) {
+        final Schema currentSchema = schema;
+
+        try (final InputStream in = session.read(flowFile);
+             final LineNumberReader reader = new LineNumberReader(new 
InputStreamReader(in, StandardCharsets.UTF_8))) {
+
+            String line;
+
+            while ((line = reader.readLine()) != null) {
+                if (line.isBlank()) {
+                    continue;
+                }
+
+                final JsonNode node = mapper.readTree(line);
+                final List<Error> errors = currentSchema.validate(node);
+
+                if (!errors.isEmpty()) {
+                    reader.close(); // NOTE: Must call close otherwise get 
IllegalStateException indicating FlowFile already in use
+                    // by an active callback or InputStream created by 
ProcessSession.read(FlowFile) has not been closed
+                    final String validationMessages = errors.toString();
+                    final String validationErrMsg = "JSON at line %s is 
invalid: %s".formatted(reader.getLineNumber(), validationMessages);
+                    final FlowFile invalidJsonFlowFile = 
session.putAttribute(flowFile, ERROR_ATTRIBUTE_KEY, validationErrMsg);
+                    getLogger().warn("JSON at line {} in {} is invalid: 
Validation Errors {}", reader.getLineNumber(), flowFile, validationMessages);
+                    session.getProvenanceReporter().route(invalidJsonFlowFile, 
REL_INVALID);
+                    session.transfer(invalidJsonFlowFile, REL_INVALID);
+                    return;
+                }
+            }
+
+            session.getProvenanceReporter().route(flowFile, REL_VALID);
+            session.transfer(flowFile, REL_VALID);
+
+        } catch (final Exception e) {
+            getLogger().error("{} processing failed {}", 
InputFormat.JSON_LINES.getDisplayName(), flowFile, e);
+            session.getProvenanceReporter().route(flowFile, REL_FAILURE);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
     private String getPropertyValidateMessage(JsonSchemaStrategy 
schemaAccessStrategy, PropertyDescriptor property) {
         return "The '" + schemaAccessStrategy.getValue() + "' Schema Access 
Strategy requires that the " + property.getDisplayName() + " property be set.";
     }
 
     private SpecificationVersion mapToSpecification(final SchemaVersion 
schemaVersion) {
-        switch (schemaVersion) {
-            case DRAFT_4:
-                return SpecificationVersion.DRAFT_4;
-            case DRAFT_6:
-                return SpecificationVersion.DRAFT_6;
-            case DRAFT_7:
-                return SpecificationVersion.DRAFT_7;
-            case DRAFT_2019_09:
-                return SpecificationVersion.DRAFT_2019_09;
-            case DRAFT_2020_12:
-                return SpecificationVersion.DRAFT_2020_12;
-        }
-        throw new IllegalArgumentException("Unsupported schema version: " + 
schemaVersion);
+        return switch (schemaVersion) {
+            case DRAFT_4 -> SpecificationVersion.DRAFT_4;
+            case DRAFT_6 -> SpecificationVersion.DRAFT_6;
+            case DRAFT_7 -> SpecificationVersion.DRAFT_7;
+            case DRAFT_2019_09 -> SpecificationVersion.DRAFT_2019_09;
+            case DRAFT_2020_12 -> SpecificationVersion.DRAFT_2020_12;
+        };
     }
 
     private JsonSchemaStrategy getSchemaAccessStrategy(PropertyContext 
context) {
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java
index b9d6810f198..4cd772620db 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateJson.java
@@ -49,6 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class TestValidateJson {
     private static final String JSON = getFileContent("simple-example.json");
@@ -62,7 +63,7 @@ class TestValidateJson {
         runner = TestRunners.newTestRunner(ValidateJson.class);
     }
 
-    @ParameterizedTest(name = "{2}")
+    @ParameterizedTest
     @MethodSource("customValidateArgs")
     void testCustomValidateMissingProperty(final 
ValidateJson.JsonSchemaStrategy strategy) {
         runner.setProperty(ValidateJson.SCHEMA_ACCESS_STRATEGY, strategy);
@@ -252,6 +253,35 @@ class TestValidateJson {
         runner.assertTransferCount(ValidateJson.REL_VALID, 1);
     }
 
+    @ParameterizedTest
+    @MethodSource("multilineJsonArgs")
+     void testMultilineJsonWhereSecondLineInvalid(ValidateJson.InputFormat 
inputFormat, boolean expectedValid) {
+        final String multilineJson = """
+                
{"FieldOne":"stringValue","FieldTwo":1234,"FieldThree":[{"arrayField":"arrayValue"}]}
+                
{"FieldOne":"stringValue","FieldTwo":"NAN","FieldThree":[{"arrayField":"arrayValue"}]}
+                """;
+        runner.setProperty(ValidateJson.SCHEMA_CONTENT, SIMPLE_SCHEMA);
+        runner.setProperty(JsonSchemaRegistryComponent.SCHEMA_VERSION, 
SCHEMA_VERSION);
+        runner.setProperty(ValidateJson.INPUT_FORMAT, inputFormat.getValue());
+        runner.enqueue(multilineJson);
+
+        runner.run();
+
+        runner.assertTransferCount(ValidateJson.REL_FAILURE, 0);
+        if (expectedValid) {
+            runner.assertTransferCount(ValidateJson.REL_INVALID, 0);
+            runner.assertTransferCount(ValidateJson.REL_VALID, 1);
+        } else {
+            runner.assertTransferCount(ValidateJson.REL_INVALID, 1);
+            runner.assertTransferCount(ValidateJson.REL_VALID, 0);
+
+            assertTrue(runner.getLogger().getWarnMessages().stream()
+                    .anyMatch(logMessage -> logMessage.getMsg().contains("JSON 
at line 2") && logMessage.getMsg().contains("is invalid")));
+        }
+
+        runner.clearTransferState();
+    }
+
     private void assertValidationErrors(Relationship relationship, boolean 
expected) {
         final Map<String, String> attributes = 
runner.getFlowFilesForRelationship(relationship).getFirst().getAttributes();
 
@@ -265,8 +295,15 @@ class TestValidateJson {
 
     private static Stream<Arguments> customValidateArgs() {
         return Stream.of(
-                
Arguments.of(ValidateJson.JsonSchemaStrategy.SCHEMA_NAME_PROPERTY, "requires 
that the JSON Schema Registry property be set"),
-                
Arguments.of(ValidateJson.JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY, "requires 
that the JSON Schema property be set")
+                Arguments.argumentSet("Require JSON Schema Registry property 
to be set", ValidateJson.JsonSchemaStrategy.SCHEMA_NAME_PROPERTY),
+                Arguments.argumentSet("Require JSON Schema property to be 
set", ValidateJson.JsonSchemaStrategy.SCHEMA_CONTENT_PROPERTY)
+        );
+    }
+
+    private static Stream<Arguments> multilineJsonArgs() {
+        return Stream.of(
+                
Arguments.argumentSet(ValidateJson.InputFormat.FLOW_FILE.getDisplayName(), 
ValidateJson.InputFormat.FLOW_FILE.getValue(), true),
+                
Arguments.argumentSet(ValidateJson.InputFormat.JSON_LINES.getDisplayName(), 
ValidateJson.InputFormat.JSON_LINES.getValue(), false)
         );
     }
 

Reply via email to