[
https://issues.apache.org/jira/browse/NIFI-5327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599489#comment-16599489
]
ASF GitHub Bot commented on NIFI-5327:
--------------------------------------
Github user MikeThomsen commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2820#discussion_r214501922
--- Diff:
nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java
---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network;
+
+import static
org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
+import static
org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processors.network.parser.Netflowv5Parser;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet",
"byte" })
+@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi
flowfile as attributes or JSON content.")
+@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description =
"Optionally read if packets are received from UDP datagrams.") })
+@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*",
description = "The key and value generated by the parsing of the header
fields."),
+ @WritesAttribute(attribute = "netflowv5.record.*", description =
"The key and value generated by the parsing of the record fields.") })
+
+public class ParseNetflowv5 extends AbstractProcessor {
+ private String destination;
+ // Add mapper
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ public static final String DESTINATION_CONTENT = "flowfile-content";
+ public static final String DESTINATION_ATTRIBUTES =
"flowfile-attribute";
+ public static final PropertyDescriptor FIELDS_DESTINATION = new
PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed
fields destination")
+ .description("Indicates whether the results of the parser are
written " + "to the FlowFile content or a FlowFile attribute; if using " +
DESTINATION_ATTRIBUTES
+ + ", fields will be populated as attributes. If set to
" + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat
JSON object.")
+ .required(true).allowableValues(DESTINATION_CONTENT,
DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
+ .description("Any FlowFile that could not be parsed as a
netflowv5 message will be transferred to this Relationship without any
attributes being added").build();
+ public static final Relationship REL_ORIGINAL = new
Relationship.Builder().name("original").description("The original raw
content").build();
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
+ .description("Any FlowFile that is successfully parsed as a
netflowv5 data will be transferred to this Relationship.").build();
+
+ public static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
+ public static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE,
REL_ORIGINAL, REL_SUCCESS)));
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ destination = context.getProperty(FIELDS_DESTINATION).getValue();
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final OptionalInt portNumber = resolvePort(flowFile);
+ final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
+
+ final byte[] buffer = new byte[(int) flowFile.getSize()];
+ session.read(flowFile, new InputStreamCallback() {
+
+ @Override
+ public void process(final InputStream in) throws IOException {
+ StreamUtils.fillBuffer(in, buffer);
+ }
+ });
+
+ final int processedRecord;
+ try {
+ processedRecord = parser.parse(buffer);
+ getLogger().debug("Parsed {} records from the packet", new
Object[] { processedRecord });
+ } catch (Throwable e) {
+ getLogger().error("Parser returned unexpected Exception {}
while processing {}; routing to failure", new Object[] { e, flowFile });
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ try {
+ final List<FlowFile> multipleRecords = new ArrayList<>();
+ switch (destination) {
+ case DESTINATION_ATTRIBUTES:
+ final Map<String, String> attributes = new HashMap<>();
+ generateKV(multipleRecords, session, flowFile, attributes,
parser, processedRecord);
+ break;
+ case DESTINATION_CONTENT:
+ generateJSON(multipleRecords, session, flowFile, parser,
processedRecord, buffer);
+ break;
+ }
+ // Create a provenance event recording the routing to success
+ multipleRecords.forEach(recordFlowFile ->
session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS));
+ session.getProvenanceReporter().route(flowFile, REL_ORIGINAL);
+ // Ready to transfer and commit
+ session.transfer(flowFile, REL_ORIGINAL);
+ session.transfer(multipleRecords, REL_SUCCESS);
+ session.adjustCounter("Records Processed", processedRecord,
false);
+ session.commit();
+ } catch (Exception e) {
+ // The flowfile has failed parsing & validation, routing to
failure
+ getLogger().error("Failed to parse {} as a netflowv5 message
due to {}; routing to failure", new Object[] { flowFile, e });
+ // Create a provenance event recording the routing to failure
+ session.getProvenanceReporter().route(flowFile, REL_FAILURE);
+ session.transfer(flowFile, REL_FAILURE);
+ session.commit();
+ return;
+ } finally {
+ session.rollback();
+ }
+ }
+
+ private void generateJSON(final List<FlowFile> multipleRecords, final
ProcessSession session, final FlowFile flowFile, final Netflowv5Parser parser,
final int processedRecord, final byte[] buffer)
+ throws JsonProcessingException {
+ int numberOfRecords = processedRecord;
+ FlowFile recordFlowFile = flowFile;
+ int record = 0;
+ while (numberOfRecords-- > 0) {
+ ObjectNode results = mapper.createObjectNode();
+ // Add Port number and message format
+ results.set("port",
mapper.valueToTree(parser.getPortNumber()));
+ results.set("format", mapper.valueToTree("netflowv5"));
+
+ recordFlowFile = session.clone(flowFile);
+ // Add JSON Objects
+ generateJSONUtil(results, parser, record++);
+
+ recordFlowFile = session.write(recordFlowFile, new
OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ try (OutputStream outputStream = new
BufferedOutputStream(out)) {
+
outputStream.write(mapper.writeValueAsBytes(results));
+ }
+ }
+ });
+ // Adjust the FlowFile mime.type attribute
+ recordFlowFile = session.putAttribute(recordFlowFile,
CoreAttributes.MIME_TYPE.key(), "application/json");
+ // Update the provenance for good measure
+ session.getProvenanceReporter().modifyContent(recordFlowFile,
"Replaced content with parsed netflowv5 fields and values");
--- End diff --
If you go with `create` you can get rid of this. Something to keep in mind
here is that depending on the volume and velocity of the data, you could easily
overpower the older provenance repository. The write-ahead one could probably
handle it, but you might cause problems for people who didn't migrate (and I'm
not sure if the write-ahead one is default now)
> NetFlow Processors
> ------------------
>
> Key: NIFI-5327
> URL: https://issues.apache.org/jira/browse/NIFI-5327
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Core Framework
> Affects Versions: 1.6.0
> Reporter: Prashanth Venkatesan
> Assignee: Prashanth Venkatesan
> Priority: Major
>
> As network traffic data scopes for the big data use case, would like NiFi to
> have processors to support parsing of those protocols.
> Netflow is a protocol introduced by Cisco that provides the ability to
> collect IP network traffic as it enters or exits an interface and is
> described in detail in here:
> [https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html]
>
> Currently, I have created the following processor:
> *ParseNetflowv5*: Parses the ingress netflowv5 bytes and ingest as either
> NiFi flowfile attributes or as a JSON content. This also sends
> one-time-template.
>
> Further ahead, we can add many processor specific to network protocols in
> this nar bundle.
> I will create a pull request.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)