[
https://issues.apache.org/jira/browse/NIFI-12407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Giuseppe Gerla updated NIFI-12407:
----------------------------------
Affects Version/s: 1.23.2
(was: 1.16.3)
> SNMP shall include the sender IP and port in the attributes of the flow file
> ----------------------------------------------------------------------------
>
> Key: NIFI-12407
> URL: https://issues.apache.org/jira/browse/NIFI-12407
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Affects Versions: 1.23.2
> Reporter: Giuseppe Gerla
> Priority: Major
> Labels: SNMP
>
> SNMP shall include the sender IP and port in the attributes of the flow file
> To do that we need to modify two classes; SNMPTrapReceiver and SNMPUtils.
> In SNMPUtils add a new method to include the attributes from the event (we
> have only included the sender ip and port)
> public static Map{*}<{*}String{*},{*} String{*}>{*}
> {*}getEventAttributeMap{*}{*}({*}final CommandResponderEvent event{*}){*} *{*
> final Map{*}<{*}String{*},{*} String{*}>{*} attributes *=* *new*
> HashMap{*}<>();{*}
> attributes{*}.{*}computeIfAbsent{*}({*}SNMP_PROP_PREFIX *+*
> "sender"{*},{*} v *->*
> String{*}.{*}valueOf{*}({*}event{*}.{*}getPeerAddress{*}()));{*}
> *return* attributes{*};{*}
> *}*
>
> In SNMPTrapReceiver modify the signature of the method createFlowFile to
> accept a CommandResponderEvent instead of a Pdu and we call the new method
> added to the SNMPUtils to include the new attribute with the sernder
> information.
> private FlowFile createFlowFile{*}({*}final ProcessSession
> processSession{*},{*} *final* \{*}CommandResponderEvent event{*}{*}){*} *{*
> *final* *PDU pdu* *=* \{*}event{*}{*}.{*}{*}getPDU{*}{*}();{*}
> FlowFile flowFile *=* processSession{*}.{*}create{*}();{*}
> final Map{*}<{*}String{*},{*} String{*}>{*} attributes{*};{*}
> *if* {*}({*}pdu *instanceof* PDUv1{*}){*} *{*
> attributes *=*
> SNMPUtils{*}.{*}getV1TrapPduAttributeMap{*}(({*}PDUv1{*}){*} pdu{*});{*}
> *}* *else* *{*
> attributes *=*
> SNMPUtils{*}.{*}getPduAttributeMap{*}({*}pdu{*});{*}
> *}*
>
> {*}attributes{*}{*}.{*}{*}putAll{*}{*}({*}{*}SNMPUtils{*}{*}.{*}{*}getEventAttributeMap{*}{*}({*}{*}event{*}{*}));{*}
> flowFile *=*
> processSession{*}.{*}putAllAttributes{*}({*}flowFile{*},{*} attributes{*});{*}
> *return* flowFile{*};{*}
> *}*
>
> @Override
> public void processPdu{*}({*}final CommandResponderEvent event{*}){*} *{*
> final PDU pdu *=* event{*}.{*}getPDU{*}();{*}
> *if* {*}({*}isValidTrapPdu{*}({*}pdu{*})){*} *{*
> final ProcessSession processSession *=*
> processSessionFactory{*}.{*}createSession{*}();{*}
> final FlowFile flowFile *=*
> createFlowFile{*}({*}processSession{*},{*} {*}event{*}{*});{*}
>
> processSession{*}.{*}getProvenanceReporter{*}().{*}create{*}({*}flowFile{*},{*}
> event{*}.{*}getPeerAddress{*}(){*} *+* "/" *+*
> pdu{*}.{*}getRequestID{*}());{*}
> *if* {*}({*}pdu{*}.{*}getErrorStatus{*}(){*} *==*
> PDU{*}.{*}noError{*}){*} *{*
> processSession{*}.{*}transfer{*}({*}flowFile{*},{*}
> REL_SUCCESS{*});{*}
> *}* *else* *{*
> processSession{*}.{*}transfer{*}({*}flowFile{*},{*}
> REL_FAILURE{*});{*}
> *}*
> processSession{*}.{*}commitAsync{*}();{*}
> *}* *else* *{*
> logger{*}.{*}error{*}({*}"Request timed out or parameters are
> incorrect."{*});{*}
> *}*
> *}*
>
> To test it
>
> @Test
> void testTrapReceiverSavesSenderIpOnFlowFile{*}(){*} *{*
> final CommandResponderEvent mockEvent *=*
> mock{*}({*}CommandResponderEvent{*}.{*}class{*});{*}
> Address address *=*
> UdpAddress{*}.{*}parse{*}({*}"[192.168.0.1/12345|http://192.168.0.1/12345]"{*});{*}
>
> when{*}({*}mockEvent{*}.{*}getPeerAddress{*}()).{*}thenReturn{*}({*}address{*});{*}
>
>
> when{*}({*}mockPdu{*}.{*}getType{*}()).{*}thenReturn{*}({*}PDU{*}.{*}INFORM{*});{*}
> final Vector{*}<{*}VariableBinding{*}>{*} vbs *=* *new*
> Vector{*}<>();{*}
>
> doReturn{*}({*}vbs{*}).{*}when{*}({*}mockPdu{*}).{*}getVariableBindings{*}();{*}
>
> when{*}({*}mockEvent{*}.{*}getPDU{*}()).{*}thenReturn{*}({*}mockPdu{*});{*}
>
> when{*}({*}mockProcessSessionFactory{*}.{*}createSession{*}()).{*}thenReturn{*}({*}mockProcessSession{*});{*}
>
> snmpTrapReceiver{*}.{*}processPdu{*}({*}mockEvent{*});{*}
>
> final List{*}<{*}MockFlowFile{*}>{*} flowFiles *=*
> mockProcessSession{*}.{*}getFlowFilesForRelationship{*}({*}AmsSnmpTrapListener{*}.{*}REL_SUCCESS{*});{*}
> final FlowFile flowFile *=* flowFiles{*}.{*}get{*}({*}0{*});{*}
>
>
> assertEquals{*}({*}"[192.168.0.1/12345|http://192.168.0.1/12345]"{*},{*}
> flowFile{*}.{*}getAttribute{*}({*}"snmp$sender"{*}));{*}
> *}*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)