Github user PrashanthVenkatesan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2820#discussion_r218292092
--- Diff:
nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java
---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network;
+
+import static
org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
+import static
org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processors.network.parser.Netflowv5Parser;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet",
"byte" })
+@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi
flowfile as attributes or JSON content.")
+@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description =
"Optionally read if packets are received from UDP datagrams.") })
+@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*",
description = "The key and value generated by the parsing of the header
fields."),
+ @WritesAttribute(attribute = "netflowv5.record.*", description =
"The key and value generated by the parsing of the record fields.") })
+
+public class ParseNetflowv5 extends AbstractProcessor {
+ private String destination;
+ // Add mapper
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ public static final String DESTINATION_CONTENT = "flowfile-content";
+ public static final String DESTINATION_ATTRIBUTES =
"flowfile-attribute";
+ public static final PropertyDescriptor FIELDS_DESTINATION = new
PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed
fields destination")
+ .description("Indicates whether the results of the parser are
written " + "to the FlowFile content or a FlowFile attribute; if using " +
DESTINATION_ATTRIBUTES
+ + ", fields will be populated as attributes. If set to
" + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat
JSON object.")
+ .required(true).allowableValues(DESTINATION_CONTENT,
DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
+ .description("Any FlowFile that could not be parsed as a
netflowv5 message will be transferred to this Relationship without any
attributes being added").build();
+ public static final Relationship REL_ORIGINAL = new
Relationship.Builder().name("original").description("The original raw
content").build();
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
+ .description("Any FlowFile that is successfully parsed as a
netflowv5 data will be transferred to this Relationship.").build();
+
+ public static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
+ public static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE,
REL_ORIGINAL, REL_SUCCESS)));
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ destination = context.getProperty(FIELDS_DESTINATION).getValue();
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final OptionalInt portNumber = resolvePort(flowFile);
+ final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
+
+ final byte[] buffer = new byte[(int) flowFile.getSize()];
+ session.read(flowFile, new InputStreamCallback() {
+
+ @Override
+ public void process(final InputStream in) throws IOException {
+ StreamUtils.fillBuffer(in, buffer);
+ }
+ });
+
+ final int processedRecord;
+ try {
+ processedRecord = parser.parse(buffer);
+ getLogger().debug("Parsed {} records from the packet", new
Object[] { processedRecord });
+ } catch (Throwable e) {
+ getLogger().error("Parser returned unexpected Exception {}
while processing {}; routing to failure", new Object[] { e, flowFile });
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ try {
+ final List<FlowFile> multipleRecords = new ArrayList<>();
+ switch (destination) {
+ case DESTINATION_ATTRIBUTES:
+ final Map<String, String> attributes = new HashMap<>();
+ generateKV(multipleRecords, session, flowFile, attributes,
parser, processedRecord);
+ break;
+ case DESTINATION_CONTENT:
+ generateJSON(multipleRecords, session, flowFile, parser,
processedRecord, buffer);
+ break;
+ }
+ // Create a provenance event recording the routing to success
+ multipleRecords.forEach(recordFlowFile ->
session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS));
+ session.getProvenanceReporter().route(flowFile, REL_ORIGINAL);
+ // Ready to transfer and commit
+ session.transfer(flowFile, REL_ORIGINAL);
+ session.transfer(multipleRecords, REL_SUCCESS);
+ session.adjustCounter("Records Processed", processedRecord,
false);
+ session.commit();
+ } catch (Exception e) {
+ // The flowfile has failed parsing & validation, routing to
failure
+ getLogger().error("Failed to parse {} as a netflowv5 message
due to {}; routing to failure", new Object[] { flowFile, e });
+ // Create a provenance event recording the routing to failure
+ session.getProvenanceReporter().route(flowFile, REL_FAILURE);
+ session.transfer(flowFile, REL_FAILURE);
+ session.commit();
+ return;
+ } finally {
+ session.rollback();
+ }
+ }
+
+ private void generateJSON(final List<FlowFile> multipleRecords, final
ProcessSession session, final FlowFile flowFile, final Netflowv5Parser parser,
final int processedRecord, final byte[] buffer)
+ throws JsonProcessingException {
+ int numberOfRecords = processedRecord;
+ FlowFile recordFlowFile = flowFile;
+ int record = 0;
+ while (numberOfRecords-- > 0) {
+ ObjectNode results = mapper.createObjectNode();
+ // Add Port number and message format
+ results.set("port",
mapper.valueToTree(parser.getPortNumber()));
+ results.set("format", mapper.valueToTree("netflowv5"));
+
+ recordFlowFile = session.clone(flowFile);
--- End diff --
I can use session.create(). But do you think any need will arise in future
from the user to good to have the attributes from the original flowfile? If
so, we will stick with session.clone() only. whats your view on this?
---