tpalfy commented on code in PR #8160:
URL: https://github.com/apache/nifi/pull/8160#discussion_r1465260947
##########
nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/processors/GetSNMP.java:
##########
@@ -148,97 +148,89 @@ public class GetSNMP extends AbstractSNMPProcessor {
@OnScheduled
public void init(final ProcessContext context) {
initSnmpManager(context);
- snmpHandler = new GetSNMPHandler(snmpResourceHandler);
+ snmpHandler = new GetSNMPHandler(snmpManager);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
processSession) {
final SNMPStrategy snmpStrategy =
SNMPStrategy.valueOf(context.getProperty(SNMP_STRATEGY).getValue());
final String oid = context.getProperty(OID).getValue();
- final FlowFile flowfile = processSession.get();
+ final boolean isNewFlowFileCreated;
+ FlowFile flowfile = processSession.get();
+ if (flowfile == null) {
+ isNewFlowFileCreated = true;
+ flowfile = processSession.create();
+ } else {
+ isNewFlowFileCreated = false;
+ }
+ final Target target =
factory.createTargetInstance(getTargetConfiguration(context, flowfile));
if (SNMPStrategy.GET == snmpStrategy) {
- performSnmpGet(context, processSession, oid, flowfile);
+ performSnmpGet(context, processSession, oid, target, flowfile,
isNewFlowFileCreated);
} else if (SNMPStrategy.WALK == snmpStrategy) {
- performSnmpWalk(context, processSession, oid, flowfile);
+ performSnmpWalk(context, processSession, oid, target, flowfile,
isNewFlowFileCreated);
}
}
void performSnmpWalk(final ProcessContext context, final ProcessSession
processSession, final String oid,
- final FlowFile flowFile) {
+ final Target target, FlowFile flowFile, final boolean
isNewFlowFileCreated) {
+
+ if (oid != null) {
+ String prefixedOid = SNMPUtils.SNMP_PROP_PREFIX + oid;
+ flowFile = processSession.putAttribute(flowFile, prefixedOid, "");
+ }
+
try {
- if (flowFile != null) {
- performSnmpWalkWithFlowFile(processSession, flowFile);
+ final Optional<SNMPTreeResponse> optionalResponse =
snmpHandler.walk(flowFile.getAttributes(), target);
+ if (optionalResponse.isPresent()) {
+ final SNMPTreeResponse response = optionalResponse.get();
+ response.logErrors(getLogger());
+ flowFile = processSession.putAllAttributes(flowFile,
response.getAttributes());
+
processSession.getProvenanceReporter().modifyAttributes(flowFile,
response.getTargetAddress() + "/walk");
+ if (isNewFlowFileCreated) {
+ processSession.getProvenanceReporter().fetch(flowFile,
"/walk");
+ } else {
+ processSession.getProvenanceReporter().receive(flowFile,
"/walk");
+ }
Review Comment:
```suggestion
if (isNewFlowFileCreated) {
processSession.getProvenanceReporter().receive(flowFile,
"/walk");
} else {
processSession.getProvenanceReporter().fetch(flowFile,
"/walk");
}
```
##########
nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/processors/GetSNMP.java:
##########
@@ -148,97 +148,89 @@ public class GetSNMP extends AbstractSNMPProcessor {
@OnScheduled
public void init(final ProcessContext context) {
initSnmpManager(context);
- snmpHandler = new GetSNMPHandler(snmpResourceHandler);
+ snmpHandler = new GetSNMPHandler(snmpManager);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
processSession) {
final SNMPStrategy snmpStrategy =
SNMPStrategy.valueOf(context.getProperty(SNMP_STRATEGY).getValue());
final String oid = context.getProperty(OID).getValue();
- final FlowFile flowfile = processSession.get();
+ final boolean isNewFlowFileCreated;
+ FlowFile flowfile = processSession.get();
+ if (flowfile == null) {
+ isNewFlowFileCreated = true;
+ flowfile = processSession.create();
+ } else {
+ isNewFlowFileCreated = false;
+ }
+ final Target target =
factory.createTargetInstance(getTargetConfiguration(context, flowfile));
if (SNMPStrategy.GET == snmpStrategy) {
- performSnmpGet(context, processSession, oid, flowfile);
+ performSnmpGet(context, processSession, oid, target, flowfile,
isNewFlowFileCreated);
} else if (SNMPStrategy.WALK == snmpStrategy) {
- performSnmpWalk(context, processSession, oid, flowfile);
+ performSnmpWalk(context, processSession, oid, target, flowfile,
isNewFlowFileCreated);
}
}
void performSnmpWalk(final ProcessContext context, final ProcessSession
processSession, final String oid,
- final FlowFile flowFile) {
+ final Target target, FlowFile flowFile, final boolean
isNewFlowFileCreated) {
+
+ if (oid != null) {
+ String prefixedOid = SNMPUtils.SNMP_PROP_PREFIX + oid;
+ flowFile = processSession.putAttribute(flowFile, prefixedOid, "");
+ }
+
try {
- if (flowFile != null) {
- performSnmpWalkWithFlowFile(processSession, flowFile);
+ final Optional<SNMPTreeResponse> optionalResponse =
snmpHandler.walk(flowFile.getAttributes(), target);
+ if (optionalResponse.isPresent()) {
+ final SNMPTreeResponse response = optionalResponse.get();
+ response.logErrors(getLogger());
+ flowFile = processSession.putAllAttributes(flowFile,
response.getAttributes());
+
processSession.getProvenanceReporter().modifyAttributes(flowFile,
response.getTargetAddress() + "/walk");
Review Comment:
```suggestion
```
##########
nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/processors/AbstractSNMPProcessor.java:
##########
@@ -70,58 +86,80 @@ abstract class AbstractSNMPProcessor extends
AbstractProcessor {
.description("Port of the SNMP Agent.")
.required(true)
.defaultValue("161")
- .addValidator(StandardValidators.PORT_VALIDATOR)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
- protected volatile SNMPResourceHandler snmpResourceHandler;
+ protected volatile Snmp snmpManager;
+ protected volatile SNMPContext factory;
@OnScheduled
public void initSnmpManager(final ProcessContext context) {
final int version =
SNMPUtils.getVersion(context.getProperty(BasicProperties.SNMP_VERSION).getValue());
final SNMPConfiguration configuration;
- final String targetHost = getTargetHost(context);
- final String targetPort = getTargetPort(context);
configuration = SNMPConfiguration.builder()
- .setTargetHost(targetHost)
- .setTargetPort(targetPort)
-
.setRetries(context.getProperty(BasicProperties.SNMP_RETRIES).asInteger())
-
.setTimeoutInMs(context.getProperty(BasicProperties.SNMP_TIMEOUT).asInteger())
- .setVersion(version)
.setAuthProtocol(context.getProperty(V3SecurityProperties.SNMP_AUTH_PROTOCOL).getValue())
.setAuthPassphrase(context.getProperty(V3SecurityProperties.SNMP_AUTH_PASSWORD).getValue())
.setPrivacyProtocol(context.getProperty(V3SecurityProperties.SNMP_PRIVACY_PROTOCOL).getValue())
.setPrivacyPassphrase(context.getProperty(V3SecurityProperties.SNMP_PRIVACY_PASSWORD).getValue())
.setSecurityName(context.getProperty(V3SecurityProperties.SNMP_SECURITY_NAME).getValue())
+ .build();
+
+ factory = SNMPFactoryProvider.getFactory(version);
+ snmpManager = factory.createSnmpManagerInstance(configuration);
+ }
+
+ protected SNMPConfiguration getTargetConfiguration(final ProcessContext
context, final FlowFile flowFile) {
+ final int version =
SNMPUtils.getVersion(context.getProperty(BasicProperties.SNMP_VERSION).getValue());
+ final String targetHost = getTargetHost(context, flowFile);
+ final String targetPort = getTargetPort(context, flowFile);
+
+ return SNMPConfiguration.builder()
+ .setVersion(version)
+ .setTargetHost(targetHost)
+ .setTargetPort(targetPort)
+
.setRetries(context.getProperty(BasicProperties.SNMP_RETRIES).asInteger())
+
.setTimeoutInMs(context.getProperty(BasicProperties.SNMP_TIMEOUT).asInteger())
+
.setSecurityName(context.getProperty(V3SecurityProperties.SNMP_SECURITY_NAME).getValue())
.setSecurityLevel(context.getProperty(V3SecurityProperties.SNMP_SECURITY_LEVEL).getValue())
.setCommunityString(context.getProperty(BasicProperties.SNMP_COMMUNITY).getValue())
.build();
-
- snmpResourceHandler =
SNMPFactoryProvider.getFactory(version).createSNMPResourceHandler(configuration);
}
/**
* Closes the current SNMP mapping.
*/
@OnStopped
public void close() {
- if (snmpResourceHandler != null) {
- snmpResourceHandler.close();
- snmpResourceHandler = null;
+ try {
+ if (snmpManager.getUSM() != null) {
+ snmpManager.getUSM().removeAllUsers();
+ SecurityModels.getInstance().removeSecurityModel(new
Integer32(snmpManager.getUSM().getID()));
+ }
+ snmpManager.close();
+ } catch (IOException e) {
+ final String errorMessage = "Could not close SNMP manager.";
+ logger.error(errorMessage, e);
+ throw new ProcessException(errorMessage);
}
}
protected void handleResponse(final ProcessContext context, final
ProcessSession processSession, final FlowFile flowFile, final
SNMPSingleResponse response,
- final Relationship success, final
Relationship failure, final String provenanceAddress) {
+ final Relationship success, final
Relationship failure, final String provenanceAddress, final boolean
isNewFlowFileCreated) {
final SNMPResponseStatus snmpResponseStatus =
processResponse(response);
processSession.putAllAttributes(flowFile, response.getAttributes());
if (snmpResponseStatus.getErrorStatus() == ErrorStatus.FAILURE) {
getLogger().error("SNMP request failed, response error: " +
snmpResponseStatus.getErrorMessage());
- processSession.getProvenanceReporter().modifyAttributes(flowFile,
response.getTargetAddress() + provenanceAddress);
processSession.transfer(flowFile, failure);
context.yield();
} else {
processSession.getProvenanceReporter().modifyAttributes(flowFile,
response.getTargetAddress() + provenanceAddress);
+ if (isNewFlowFileCreated) {
+ processSession.getProvenanceReporter().fetch(flowFile,
response.getTargetAddress() + provenanceAddress);
+ } else {
+ processSession.getProvenanceReporter().receive(flowFile,
response.getTargetAddress() + provenanceAddress);
+ }
processSession.transfer(flowFile, success);
Review Comment:
```suggestion
if (isNewFlowFileCreated) {
processSession.getProvenanceReporter().receive(flowFile,
response.getTargetAddress() + provenanceAddress);
} else {
processSession.getProvenanceReporter().fetch(flowFile,
response.getTargetAddress() + provenanceAddress);
}
processSession.transfer(flowFile, success);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]