[ 
https://issues.apache.org/jira/browse/NIFI-12407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Giuseppe Gerla updated NIFI-12407:
----------------------------------
    Affects Version/s: 1.23.2
                           (was: 1.16.3)

> SNMP shall include the sender IP and port in the attributes of the flow file
> ----------------------------------------------------------------------------
>
>                 Key: NIFI-12407
>                 URL: https://issues.apache.org/jira/browse/NIFI-12407
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>    Affects Versions: 1.23.2
>            Reporter: Giuseppe Gerla
>            Priority: Major
>              Labels: SNMP
>
> SNMP shall include the sender IP and port in the attributes of the flow file
> To do that we need to modify two classes; SNMPTrapReceiver and SNMPUtils.
> In SNMPUtils add a new method to include the attributes from the event (we 
> have only included the sender ip and port)
>     public static Map{*}<{*}String{*},{*} String{*}>{*} 
> {*}getEventAttributeMap{*}{*}({*}final CommandResponderEvent event{*}){*} *{*
>         final Map{*}<{*}String{*},{*} String{*}>{*} attributes *=* *new* 
> HashMap{*}<>();{*}
>         attributes{*}.{*}computeIfAbsent{*}({*}SNMP_PROP_PREFIX *+* 
> "sender"{*},{*} v *->* 
> String{*}.{*}valueOf{*}({*}event{*}.{*}getPeerAddress{*}()));{*}
>         *return* attributes{*};{*}
>     *}*
>  
> In SNMPTrapReceiver modify the signature of the method createFlowFile to 
> accept a CommandResponderEvent instead of a Pdu and we call the new method 
> added to the SNMPUtils to include the new attribute with the sernder 
> information.
>     private FlowFile createFlowFile{*}({*}final ProcessSession 
> processSession{*},{*} *final* \{*}CommandResponderEvent event{*}{*}){*} *{*
>         *final* *PDU pdu* *=* \{*}event{*}{*}.{*}{*}getPDU{*}{*}();{*}
>         FlowFile flowFile *=* processSession{*}.{*}create{*}();{*}
>         final Map{*}<{*}String{*},{*} String{*}>{*} attributes{*};{*}
>         *if* {*}({*}pdu *instanceof* PDUv1{*}){*} *{*
>             attributes *=* 
> SNMPUtils{*}.{*}getV1TrapPduAttributeMap{*}(({*}PDUv1{*}){*} pdu{*});{*}
>         *}* *else* *{*
>             attributes *=* 
> SNMPUtils{*}.{*}getPduAttributeMap{*}({*}pdu{*});{*}
>         *}*
>         
> {*}attributes{*}{*}.{*}{*}putAll{*}{*}({*}{*}SNMPUtils{*}{*}.{*}{*}getEventAttributeMap{*}{*}({*}{*}event{*}{*}));{*}
>         flowFile *=* 
> processSession{*}.{*}putAllAttributes{*}({*}flowFile{*},{*} attributes{*});{*}
>         *return* flowFile{*};{*}
>     *}*
>  
>     @Override
>     public void processPdu{*}({*}final CommandResponderEvent event{*}){*} *{*
>         final PDU pdu *=* event{*}.{*}getPDU{*}();{*}
>         *if* {*}({*}isValidTrapPdu{*}({*}pdu{*})){*} *{*
>             final ProcessSession processSession *=* 
> processSessionFactory{*}.{*}createSession{*}();{*}
>             final FlowFile flowFile *=* 
> createFlowFile{*}({*}processSession{*},{*} {*}event{*}{*});{*}
>             
> processSession{*}.{*}getProvenanceReporter{*}().{*}create{*}({*}flowFile{*},{*}
>  event{*}.{*}getPeerAddress{*}(){*} *+* "/" *+* 
> pdu{*}.{*}getRequestID{*}());{*}
>             *if* {*}({*}pdu{*}.{*}getErrorStatus{*}(){*} *==* 
> PDU{*}.{*}noError{*}){*} *{*
>                 processSession{*}.{*}transfer{*}({*}flowFile{*},{*} 
> REL_SUCCESS{*});{*}
>             *}* *else* *{*
>                 processSession{*}.{*}transfer{*}({*}flowFile{*},{*} 
> REL_FAILURE{*});{*}
>             *}*
>             processSession{*}.{*}commitAsync{*}();{*}
>         *}* *else* *{*
>             logger{*}.{*}error{*}({*}"Request timed out or parameters are 
> incorrect."{*});{*}
>         *}*
>     *}*
>  
> To test it
>  
> @Test
>     void testTrapReceiverSavesSenderIpOnFlowFile{*}(){*} *{*
>         final CommandResponderEvent mockEvent *=* 
> mock{*}({*}CommandResponderEvent{*}.{*}class{*});{*}
>         Address address *=* 
> UdpAddress{*}.{*}parse{*}({*}"[192.168.0.1/12345|http://192.168.0.1/12345]"{*});{*}
>         
> when{*}({*}mockEvent{*}.{*}getPeerAddress{*}()).{*}thenReturn{*}({*}address{*});{*}
>  
>         
> when{*}({*}mockPdu{*}.{*}getType{*}()).{*}thenReturn{*}({*}PDU{*}.{*}INFORM{*});{*}
>         final Vector{*}<{*}VariableBinding{*}>{*} vbs *=* *new* 
> Vector{*}<>();{*}
>         
> doReturn{*}({*}vbs{*}).{*}when{*}({*}mockPdu{*}).{*}getVariableBindings{*}();{*}
>         
> when{*}({*}mockEvent{*}.{*}getPDU{*}()).{*}thenReturn{*}({*}mockPdu{*});{*}
>         
> when{*}({*}mockProcessSessionFactory{*}.{*}createSession{*}()).{*}thenReturn{*}({*}mockProcessSession{*});{*}
>  
>         snmpTrapReceiver{*}.{*}processPdu{*}({*}mockEvent{*});{*}
>  
>         final List{*}<{*}MockFlowFile{*}>{*} flowFiles *=* 
> mockProcessSession{*}.{*}getFlowFilesForRelationship{*}({*}AmsSnmpTrapListener{*}.{*}REL_SUCCESS{*});{*}
>         final FlowFile flowFile *=* flowFiles{*}.{*}get{*}({*}0{*});{*}
>  
>         
> assertEquals{*}({*}"[192.168.0.1/12345|http://192.168.0.1/12345]"{*},{*} 
> flowFile{*}.{*}getAttribute{*}({*}"snmp$sender"{*}));{*}
>     *}*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to