exceptionfactory commented on code in PR #11012:
URL: https://github.com/apache/nifi/pull/11012#discussion_r2998695036


##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java:
##########
@@ -80,8 +83,8 @@
             + ", this attribute will contain the error message resulting from 
the validation failure.")
 })
 @CapabilityDescription("Validates the contents of FlowFiles against a 
configurable JSON Schema. See json-schema.org for specification standards. " +
-        "This Processor does not support input containing multiple JSON 
objects, such as newline-delimited JSON. If the input FlowFile contains " +
-        "newline-delimited JSON, only the first line will be validated."
+        "This Processor supports input containing multiple JSON objects using 
newline-delimited JSON based on configuration properties, " +
+        "otherwise if the input FlowFile contains newline-delimited JSON, only 
the first line will be validated."

Review Comment:
   This is a good opportunity to switch the value to a multi-line string.



##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java:
##########
@@ -298,16 +338,25 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             }
         }
 
+        if (schema == null) {
+            getLogger().error("JSON schema not configured for {}", flowFile);
+            session.getProvenanceReporter().route(flowFile, REL_FAILURE);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final InputFormat inputFormat = 
context.getProperty(INPUT_FORMAT).asAllowableValue(InputFormat.class);
+        if (inputFormat == InputFormat.FLOW_FILE) {
+            validateSingleJson(session, flowFile);
+        } else {
+            validateJsonLines(session, flowFile);
+        }
+    }
+
+    void validateSingleJson(ProcessSession session, FlowFile flowFile) {

Review Comment:
   `validateFlowFile` seems like a better name given the Input Format value.
   
   ```suggestion
       void validateFlowFile(final ProcessSession session, final FlowFile 
flowFile) {
   ```



##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java:
##########
@@ -327,24 +376,56 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
         }
     }
 
+    void validateJsonLines(ProcessSession session, FlowFile flowFile) {
+
+        try (final InputStream in = session.read(flowFile);
+             final LineNumberReader reader = new LineNumberReader(new 
InputStreamReader(in, StandardCharsets.UTF_8))) {
+
+            String line;
+
+            while ((line = reader.readLine()) != null) {
+                if (line.isBlank()) {
+                    continue;
+                }
+
+                final JsonNode node = mapper.readTree(line);
+                final List<Error> errors = schema.validate(node);
+
+                if (!errors.isEmpty()) {
+                    reader.close(); // NOTE: Must call close otherwise get 
IllegalStateException indicating FlowFile already in use
+                    // by an active callback or InputStream created by 
ProcessSession.read(FlowFile) has not been closed
+                    final String validationMessages = errors.toString();
+                    final String validationErrMsg = "JSON at line %s is 
invalid: %s".formatted(reader.getLineNumber(), validationMessages);
+                    flowFile = session.putAttribute(flowFile, 
ERROR_ATTRIBUTE_KEY, validationErrMsg);
+                    getLogger().warn("JSON at line {} in {} is invalid: 
Validation Errors {}", reader.getLineNumber(), flowFile, validationMessages);
+                    session.getProvenanceReporter().route(flowFile, 
REL_INVALID);
+                    session.transfer(flowFile, REL_INVALID);
+                    return;
+                }
+            }
+
+            session.getProvenanceReporter().route(flowFile, REL_VALID);
+            session.transfer(flowFile, REL_VALID);
+
+        } catch (Exception e) {

Review Comment:
   ```suggestion
           } catch (final Exception e) {
   ```



##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java:
##########
@@ -125,7 +128,35 @@ public String getDescription() {
         }
     }
 
-    protected static final String ERROR_ATTRIBUTE_KEY = 
"json.validation.errors";
+    public enum InputFormat implements DescribedValue {

Review Comment:
   ```suggestion
       enum InputFormat implements DescribedValue {
   ```



##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateJson.java:
##########
@@ -327,24 +376,56 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
         }
     }
 
+    void validateJsonLines(ProcessSession session, FlowFile flowFile) {

Review Comment:
   ```suggestion
       void validateJsonLines(final ProcessSession session, final FlowFile 
flowFile) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to