Giuseppe Gerla created NIFI-12407:
-------------------------------------
Summary: SNMP shall include the sender IP and port in the
attributes of the flow file
Key: NIFI-12407
URL: https://issues.apache.org/jira/browse/NIFI-12407
Project: Apache NiFi
Issue Type: Improvement
Components: Extensions
Affects Versions: 1.16.3
Reporter: Giuseppe Gerla
SNMP shall include the sender IP and port in the attributes of the flow file
To do that we need to modify two classes; SNMPTrapReceiver and SNMPUtils.
In SNMPUtils add a new method to include the attributes from the event (we have
only included the sender ip and port)
public static Map{*}<{*}String{*},{*} String{*}>{*}
{*}getEventAttributeMap{*}{*}({*}final CommandResponderEvent event{*}){*} *{*
final Map{*}<{*}String{*},{*} String{*}>{*} attributes *=* *new*
HashMap{*}<>();{*}
attributes{*}.{*}computeIfAbsent{*}({*}SNMP_PROP_PREFIX *+*
"sender"{*},{*} v *->*
String{*}.{*}valueOf{*}({*}event{*}.{*}getPeerAddress{*}()));{*}
*return* attributes{*};{*}
*}*
In SNMPTrapReceiver modify the signature of the method createFlowFile to accept
a CommandResponderEvent instead of a Pdu and we call the new method added to
the SNMPUtils to include the new attribute with the sernder information.
private FlowFile createFlowFile{*}({*}final ProcessSession
processSession{*},{*} *final* \{*}CommandResponderEvent event{*}{*}){*} *{*
*final* *PDU pdu* *=* \{*}event{*}{*}.{*}{*}getPDU{*}{*}();{*}
FlowFile flowFile *=* processSession{*}.{*}create{*}();{*}
final Map{*}<{*}String{*},{*} String{*}>{*} attributes{*};{*}
*if* {*}({*}pdu *instanceof* PDUv1{*}){*} *{*
attributes *=*
SNMPUtils{*}.{*}getV1TrapPduAttributeMap{*}(({*}PDUv1{*}){*} pdu{*});{*}
*}* *else* *{*
attributes *=* SNMPUtils{*}.{*}getPduAttributeMap{*}({*}pdu{*});{*}
*}*
{*}attributes{*}{*}.{*}{*}putAll{*}{*}({*}{*}SNMPUtils{*}{*}.{*}{*}getEventAttributeMap{*}{*}({*}{*}event{*}{*}));{*}
flowFile *=*
processSession{*}.{*}putAllAttributes{*}({*}flowFile{*},{*} attributes{*});{*}
*return* flowFile{*};{*}
*}*
@Override
public void processPdu{*}({*}final CommandResponderEvent event{*}){*} *{*
final PDU pdu *=* event{*}.{*}getPDU{*}();{*}
*if* {*}({*}isValidTrapPdu{*}({*}pdu{*})){*} *{*
final ProcessSession processSession *=*
processSessionFactory{*}.{*}createSession{*}();{*}
final FlowFile flowFile *=*
createFlowFile{*}({*}processSession{*},{*} {*}event{*}{*});{*}
processSession{*}.{*}getProvenanceReporter{*}().{*}create{*}({*}flowFile{*},{*}
event{*}.{*}getPeerAddress{*}(){*} *+* "/" *+* pdu{*}.{*}getRequestID{*}());{*}
*if* {*}({*}pdu{*}.{*}getErrorStatus{*}(){*} *==*
PDU{*}.{*}noError{*}){*} *{*
processSession{*}.{*}transfer{*}({*}flowFile{*},{*}
REL_SUCCESS{*});{*}
*}* *else* *{*
processSession{*}.{*}transfer{*}({*}flowFile{*},{*}
REL_FAILURE{*});{*}
*}*
processSession{*}.{*}commitAsync{*}();{*}
*}* *else* *{*
logger{*}.{*}error{*}({*}"Request timed out or parameters are
incorrect."{*});{*}
*}*
*}*
To test it
@Test
void testTrapReceiverSavesSenderIpOnFlowFile{*}(){*} *{*
final CommandResponderEvent mockEvent *=*
mock{*}({*}CommandResponderEvent{*}.{*}class{*});{*}
Address address *=*
UdpAddress{*}.{*}parse{*}({*}"[192.168.0.1/12345|http://192.168.0.1/12345]"{*});{*}
when{*}({*}mockEvent{*}.{*}getPeerAddress{*}()).{*}thenReturn{*}({*}address{*});{*}
when{*}({*}mockPdu{*}.{*}getType{*}()).{*}thenReturn{*}({*}PDU{*}.{*}INFORM{*});{*}
final Vector{*}<{*}VariableBinding{*}>{*} vbs *=* *new*
Vector{*}<>();{*}
doReturn{*}({*}vbs{*}).{*}when{*}({*}mockPdu{*}).{*}getVariableBindings{*}();{*}
when{*}({*}mockEvent{*}.{*}getPDU{*}()).{*}thenReturn{*}({*}mockPdu{*});{*}
when{*}({*}mockProcessSessionFactory{*}.{*}createSession{*}()).{*}thenReturn{*}({*}mockProcessSession{*});{*}
snmpTrapReceiver{*}.{*}processPdu{*}({*}mockEvent{*});{*}
final List{*}<{*}MockFlowFile{*}>{*} flowFiles *=*
mockProcessSession{*}.{*}getFlowFilesForRelationship{*}({*}AmsSnmpTrapListener{*}.{*}REL_SUCCESS{*});{*}
final FlowFile flowFile *=* flowFiles{*}.{*}get{*}({*}0{*});{*}
assertEquals{*}({*}"[192.168.0.1/12345|http://192.168.0.1/12345]"{*},{*}
flowFile{*}.{*}getAttribute{*}({*}"snmp$sender"{*}));{*}
*}*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)