Giuseppe Gerla created NIFI-12407:
-------------------------------------

             Summary: SNMP shall include the sender IP and port in the 
attributes of the flow file
                 Key: NIFI-12407
                 URL: https://issues.apache.org/jira/browse/NIFI-12407
             Project: Apache NiFi
          Issue Type: Improvement
          Components: Extensions
    Affects Versions: 1.16.3
            Reporter: Giuseppe Gerla


SNMP shall include the sender IP and port in the attributes of the flow file

To do that we need to modify two classes; SNMPTrapReceiver and SNMPUtils.

In SNMPUtils add a new method to include the attributes from the event (we have 
only included the sender ip and port)

    public static Map{*}<{*}String{*},{*} String{*}>{*} 
{*}getEventAttributeMap{*}{*}({*}final CommandResponderEvent event{*}){*} *{*

        final Map{*}<{*}String{*},{*} String{*}>{*} attributes *=* *new* 
HashMap{*}<>();{*}

        attributes{*}.{*}computeIfAbsent{*}({*}SNMP_PROP_PREFIX *+* 
"sender"{*},{*} v *->* 
String{*}.{*}valueOf{*}({*}event{*}.{*}getPeerAddress{*}()));{*}

        *return* attributes{*};{*}

    *}*

 

In SNMPTrapReceiver modify the signature of the method createFlowFile to accept 
a CommandResponderEvent instead of a Pdu and we call the new method added to 
the SNMPUtils to include the new attribute with the sernder information.

    private FlowFile createFlowFile{*}({*}final ProcessSession 
processSession{*},{*} *final* \{*}CommandResponderEvent event{*}{*}){*} *{*

        *final* *PDU pdu* *=* \{*}event{*}{*}.{*}{*}getPDU{*}{*}();{*}

        FlowFile flowFile *=* processSession{*}.{*}create{*}();{*}

        final Map{*}<{*}String{*},{*} String{*}>{*} attributes{*};{*}

        *if* {*}({*}pdu *instanceof* PDUv1{*}){*} *{*

            attributes *=* 
SNMPUtils{*}.{*}getV1TrapPduAttributeMap{*}(({*}PDUv1{*}){*} pdu{*});{*}

        *}* *else* *{*

            attributes *=* SNMPUtils{*}.{*}getPduAttributeMap{*}({*}pdu{*});{*}

        *}*

        
{*}attributes{*}{*}.{*}{*}putAll{*}{*}({*}{*}SNMPUtils{*}{*}.{*}{*}getEventAttributeMap{*}{*}({*}{*}event{*}{*}));{*}

        flowFile *=* 
processSession{*}.{*}putAllAttributes{*}({*}flowFile{*},{*} attributes{*});{*}

        *return* flowFile{*};{*}

    *}*

 

    @Override

    public void processPdu{*}({*}final CommandResponderEvent event{*}){*} *{*

        final PDU pdu *=* event{*}.{*}getPDU{*}();{*}

        *if* {*}({*}isValidTrapPdu{*}({*}pdu{*})){*} *{*

            final ProcessSession processSession *=* 
processSessionFactory{*}.{*}createSession{*}();{*}

            final FlowFile flowFile *=* 
createFlowFile{*}({*}processSession{*},{*} {*}event{*}{*});{*}

            
processSession{*}.{*}getProvenanceReporter{*}().{*}create{*}({*}flowFile{*},{*} 
event{*}.{*}getPeerAddress{*}(){*} *+* "/" *+* pdu{*}.{*}getRequestID{*}());{*}

            *if* {*}({*}pdu{*}.{*}getErrorStatus{*}(){*} *==* 
PDU{*}.{*}noError{*}){*} *{*

                processSession{*}.{*}transfer{*}({*}flowFile{*},{*} 
REL_SUCCESS{*});{*}

            *}* *else* *{*

                processSession{*}.{*}transfer{*}({*}flowFile{*},{*} 
REL_FAILURE{*});{*}

            *}*

            processSession{*}.{*}commitAsync{*}();{*}

        *}* *else* *{*

            logger{*}.{*}error{*}({*}"Request timed out or parameters are 
incorrect."{*});{*}

        *}*

    *}*

 

To test it

 

@Test

    void testTrapReceiverSavesSenderIpOnFlowFile{*}(){*} *{*

        final CommandResponderEvent mockEvent *=* 
mock{*}({*}CommandResponderEvent{*}.{*}class{*});{*}

        Address address *=* 
UdpAddress{*}.{*}parse{*}({*}"[192.168.0.1/12345|http://192.168.0.1/12345]"{*});{*}

        
when{*}({*}mockEvent{*}.{*}getPeerAddress{*}()).{*}thenReturn{*}({*}address{*});{*}

 

        
when{*}({*}mockPdu{*}.{*}getType{*}()).{*}thenReturn{*}({*}PDU{*}.{*}INFORM{*});{*}

        final Vector{*}<{*}VariableBinding{*}>{*} vbs *=* *new* 
Vector{*}<>();{*}

        
doReturn{*}({*}vbs{*}).{*}when{*}({*}mockPdu{*}).{*}getVariableBindings{*}();{*}

        
when{*}({*}mockEvent{*}.{*}getPDU{*}()).{*}thenReturn{*}({*}mockPdu{*});{*}

        
when{*}({*}mockProcessSessionFactory{*}.{*}createSession{*}()).{*}thenReturn{*}({*}mockProcessSession{*});{*}

 

        snmpTrapReceiver{*}.{*}processPdu{*}({*}mockEvent{*});{*}

 

        final List{*}<{*}MockFlowFile{*}>{*} flowFiles *=* 
mockProcessSession{*}.{*}getFlowFilesForRelationship{*}({*}AmsSnmpTrapListener{*}.{*}REL_SUCCESS{*});{*}

        final FlowFile flowFile *=* flowFiles{*}.{*}get{*}({*}0{*});{*}

 

        
assertEquals{*}({*}"[192.168.0.1/12345|http://192.168.0.1/12345]"{*},{*} 
flowFile{*}.{*}getAttribute{*}({*}"snmp$sender"{*}));{*}

    *}*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to