ottobackwards commented on a change in pull request #3414: NIFI-5900 Added 
SplitLargeJson processor
URL: https://github.com/apache/nifi/pull/3414#discussion_r274513163
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitLargeJson.java
 ##########
 @@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.JsonFragmentWriter;
+import org.apache.nifi.processors.standard.util.JsonStack;
+import org.apache.nifi.processors.standard.util.SimpleJsonPath;
+
+import javax.json.Json;
+import javax.json.stream.JsonParser;
+import javax.json.stream.JsonParsingException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"json", "split", "jsonpath"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Splits a JSON File into multiple, separate FlowFiles.  
Each element of the object or array " +
+        "specified by the JSON Path will be sent in a dedicated flowfile to 
the 'split' relationship.  The " +
+        "original file will be transferred to the 'original' relationship.  If 
the specified JSON Path is not found, " +
+        "the original file is routed to 'failure' and no files are generated. 
"+
+        "The purpose of this processor is to minimize the amount of memory 
needed to split a JSON document.  It is " +
+        "intended for use with large JSON files when the JSON Path is 
rudimentary. This processor conserves memory " +
+        "by reading documents in a streaming fashion (without loading the 
whole document into memory at once) and " +
+        "therefore cannot support all JSON Path expressions.  Note, however, 
that a fragment.index attribute is set " +
+        "on every outgoing FlowFile. This, in combination with a 
RouteOnAttribute processor, can be used in place " +
+        "of certain JSON Path constructs. The specified JSON Path will be 
checked and the processor will reflect " +
+        "an invalid state if the expression is not supported.  Lastly, in 
contrast with the SplitJson processor, " +
+        "every generated flowfile will always be a self-contained valid JSON 
document.  For example, when " +
+        "splitting an array of scalar values, each resulting document will be 
framed in array context.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "fragment.identifier",
+                description = "All split FlowFiles produced from the same 
parent FlowFile will have the same " +
+                        "randomly generated UUID added for this attribute"),
+        @WritesAttribute(attribute = "fragment.index",
+                description = "A one-up number that indicates the ordering of 
the split FlowFiles that were " +
+                        "created from a single parent FlowFile"),
+        @WritesAttribute(attribute = "segment.original.filename ", description 
= "The filename of the parent FlowFile")
+})
+@SeeAlso(SplitJson.class)
+public class SplitLargeJson extends AbstractProcessor {
+
+    public static final PropertyDescriptor JSON_PATH_EXPRESSION = new 
PropertyDescriptor.Builder()
+            .name("JsonPath Expression")
+            .description("A JsonPath expression that indicates the array 
element to split into JSON/scalar fragments.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // Full 
validation occurs in #customValidate
+            .required(true)
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("The original FlowFile that was split into segments. 
If the FlowFile fails processing, " +
+                    "nothing will be sent to this relationship")
+            .build();
+    public static final Relationship REL_SPLIT = new Relationship.Builder()
+            .name("split")
+            .description("All segments of the original FlowFile will be routed 
to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for 
example, the FlowFile is not valid " +
+                    "JSON or the specified path does not exist), it will be 
routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(JSON_PATH_EXPRESSION);
+        this.properties = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_SPLIT);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        String value = 
validationContext.getProperty(JSON_PATH_EXPRESSION).getValue();
+        ValidationResult.Builder vrb = new ValidationResult.Builder();
+        try {
+            SimpleJsonPath.of(value);
+            vrb.valid(true);
+        } catch (Exception e) {
+            vrb.subject(value);
+            vrb.input(getClass().getName());
+            vrb.explanation("the specified JSON path is either invalid or not 
supported by this processor.");
+        }
+        return Collections.singleton(vrb.build());
+    }
+
+    /** Provide read-only access to a JsonParser, exposing only the getter 
methods for the current value */
+    public static class JsonParserView {
+        private final JsonParser parser;
+
+        JsonParserView(JsonParser parser) {
+            this.parser = parser;
+        }
+
+        public String getString() {
+            return parser.getString();
+        }
+
+        public BigDecimal getBigDecimal() {
+            return parser.getBigDecimal();
+        }
+
+        public int getInt() {
+            return parser.getInt();
+        }
+
+        public boolean isIntegralNumber() {
+            return parser.isIntegralNumber();
+        }
+    }
+
+    @Override
 
 Review comment:
   Could we have javadoc here to describe how this all works?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to