tpalfy commented on code in PR #8160:
URL: https://github.com/apache/nifi/pull/8160#discussion_r1465260947


##########
nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/processors/GetSNMP.java:
##########
@@ -148,97 +148,89 @@ public class GetSNMP extends AbstractSNMPProcessor {
     @OnScheduled
     public void init(final ProcessContext context) {
         initSnmpManager(context);
-        snmpHandler = new GetSNMPHandler(snmpResourceHandler);
+        snmpHandler = new GetSNMPHandler(snmpManager);
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
processSession) {
         final SNMPStrategy snmpStrategy = 
SNMPStrategy.valueOf(context.getProperty(SNMP_STRATEGY).getValue());
         final String oid = context.getProperty(OID).getValue();
-        final FlowFile flowfile = processSession.get();
+        final boolean isNewFlowFileCreated;
+        FlowFile flowfile = processSession.get();
+        if (flowfile == null) {
+            isNewFlowFileCreated = true;
+            flowfile = processSession.create();
+        } else {
+            isNewFlowFileCreated = false;
+        }
 
+        final Target target = 
factory.createTargetInstance(getTargetConfiguration(context, flowfile));
         if (SNMPStrategy.GET == snmpStrategy) {
-            performSnmpGet(context, processSession, oid, flowfile);
+            performSnmpGet(context, processSession, oid, target, flowfile, 
isNewFlowFileCreated);
         } else if (SNMPStrategy.WALK == snmpStrategy) {
-            performSnmpWalk(context, processSession, oid, flowfile);
+            performSnmpWalk(context, processSession, oid, target, flowfile, 
isNewFlowFileCreated);
         }
     }
 
     void performSnmpWalk(final ProcessContext context, final ProcessSession 
processSession, final String oid,
-                         final FlowFile flowFile) {
+                         final Target target, FlowFile flowFile, final boolean 
isNewFlowFileCreated) {
+
+        if (oid != null) {
+            String prefixedOid = SNMPUtils.SNMP_PROP_PREFIX + oid;
+            flowFile = processSession.putAttribute(flowFile, prefixedOid, "");
+        }
+
         try {
-            if (flowFile != null) {
-                performSnmpWalkWithFlowFile(processSession, flowFile);
+            final Optional<SNMPTreeResponse> optionalResponse = 
snmpHandler.walk(flowFile.getAttributes(), target);
+            if (optionalResponse.isPresent()) {
+                final SNMPTreeResponse response = optionalResponse.get();
+                response.logErrors(getLogger());
+                flowFile = processSession.putAllAttributes(flowFile, 
response.getAttributes());
+                
processSession.getProvenanceReporter().modifyAttributes(flowFile, 
response.getTargetAddress() + "/walk");
+                if (isNewFlowFileCreated) {
+                    processSession.getProvenanceReporter().fetch(flowFile, 
"/walk");
+                } else {
+                    processSession.getProvenanceReporter().receive(flowFile, 
"/walk");
+                }

Review Comment:
   ```suggestion
                   if (isNewFlowFileCreated) {
                       processSession.getProvenanceReporter().receive(flowFile, 
"/walk");
                   } else {
                       processSession.getProvenanceReporter().fetch(flowFile, 
"/walk");
                   }
   ```



##########
nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/processors/GetSNMP.java:
##########
@@ -148,97 +148,89 @@ public class GetSNMP extends AbstractSNMPProcessor {
     @OnScheduled
     public void init(final ProcessContext context) {
         initSnmpManager(context);
-        snmpHandler = new GetSNMPHandler(snmpResourceHandler);
+        snmpHandler = new GetSNMPHandler(snmpManager);
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
processSession) {
         final SNMPStrategy snmpStrategy = 
SNMPStrategy.valueOf(context.getProperty(SNMP_STRATEGY).getValue());
         final String oid = context.getProperty(OID).getValue();
-        final FlowFile flowfile = processSession.get();
+        final boolean isNewFlowFileCreated;
+        FlowFile flowfile = processSession.get();
+        if (flowfile == null) {
+            isNewFlowFileCreated = true;
+            flowfile = processSession.create();
+        } else {
+            isNewFlowFileCreated = false;
+        }
 
+        final Target target = 
factory.createTargetInstance(getTargetConfiguration(context, flowfile));
         if (SNMPStrategy.GET == snmpStrategy) {
-            performSnmpGet(context, processSession, oid, flowfile);
+            performSnmpGet(context, processSession, oid, target, flowfile, 
isNewFlowFileCreated);
         } else if (SNMPStrategy.WALK == snmpStrategy) {
-            performSnmpWalk(context, processSession, oid, flowfile);
+            performSnmpWalk(context, processSession, oid, target, flowfile, 
isNewFlowFileCreated);
         }
     }
 
     void performSnmpWalk(final ProcessContext context, final ProcessSession 
processSession, final String oid,
-                         final FlowFile flowFile) {
+                         final Target target, FlowFile flowFile, final boolean 
isNewFlowFileCreated) {
+
+        if (oid != null) {
+            String prefixedOid = SNMPUtils.SNMP_PROP_PREFIX + oid;
+            flowFile = processSession.putAttribute(flowFile, prefixedOid, "");
+        }
+
         try {
-            if (flowFile != null) {
-                performSnmpWalkWithFlowFile(processSession, flowFile);
+            final Optional<SNMPTreeResponse> optionalResponse = 
snmpHandler.walk(flowFile.getAttributes(), target);
+            if (optionalResponse.isPresent()) {
+                final SNMPTreeResponse response = optionalResponse.get();
+                response.logErrors(getLogger());
+                flowFile = processSession.putAllAttributes(flowFile, 
response.getAttributes());
+                
processSession.getProvenanceReporter().modifyAttributes(flowFile, 
response.getTargetAddress() + "/walk");

Review Comment:
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/processors/AbstractSNMPProcessor.java:
##########
@@ -70,58 +86,80 @@ abstract class AbstractSNMPProcessor extends 
AbstractProcessor {
             .description("Port of the SNMP Agent.")
             .required(true)
             .defaultValue("161")
-            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
-    protected volatile SNMPResourceHandler snmpResourceHandler;
+    protected volatile Snmp snmpManager;
+    protected volatile SNMPContext factory;
 
     @OnScheduled
     public void initSnmpManager(final ProcessContext context) {
         final int version = 
SNMPUtils.getVersion(context.getProperty(BasicProperties.SNMP_VERSION).getValue());
         final SNMPConfiguration configuration;
-        final String targetHost = getTargetHost(context);
-        final String targetPort = getTargetPort(context);
 
         configuration = SNMPConfiguration.builder()
-                .setTargetHost(targetHost)
-                .setTargetPort(targetPort)
-                
.setRetries(context.getProperty(BasicProperties.SNMP_RETRIES).asInteger())
-                
.setTimeoutInMs(context.getProperty(BasicProperties.SNMP_TIMEOUT).asInteger())
-                .setVersion(version)
                 
.setAuthProtocol(context.getProperty(V3SecurityProperties.SNMP_AUTH_PROTOCOL).getValue())
                 
.setAuthPassphrase(context.getProperty(V3SecurityProperties.SNMP_AUTH_PASSWORD).getValue())
                 
.setPrivacyProtocol(context.getProperty(V3SecurityProperties.SNMP_PRIVACY_PROTOCOL).getValue())
                 
.setPrivacyPassphrase(context.getProperty(V3SecurityProperties.SNMP_PRIVACY_PASSWORD).getValue())
                 
.setSecurityName(context.getProperty(V3SecurityProperties.SNMP_SECURITY_NAME).getValue())
+                .build();
+
+        factory = SNMPFactoryProvider.getFactory(version);
+        snmpManager = factory.createSnmpManagerInstance(configuration);
+    }
+
+    protected SNMPConfiguration getTargetConfiguration(final ProcessContext 
context, final FlowFile flowFile) {
+        final int version = 
SNMPUtils.getVersion(context.getProperty(BasicProperties.SNMP_VERSION).getValue());
+        final String targetHost = getTargetHost(context, flowFile);
+        final String targetPort = getTargetPort(context, flowFile);
+
+        return SNMPConfiguration.builder()
+                .setVersion(version)
+                .setTargetHost(targetHost)
+                .setTargetPort(targetPort)
+                
.setRetries(context.getProperty(BasicProperties.SNMP_RETRIES).asInteger())
+                
.setTimeoutInMs(context.getProperty(BasicProperties.SNMP_TIMEOUT).asInteger())
+                
.setSecurityName(context.getProperty(V3SecurityProperties.SNMP_SECURITY_NAME).getValue())
                 
.setSecurityLevel(context.getProperty(V3SecurityProperties.SNMP_SECURITY_LEVEL).getValue())
                 
.setCommunityString(context.getProperty(BasicProperties.SNMP_COMMUNITY).getValue())
                 .build();
-
-        snmpResourceHandler = 
SNMPFactoryProvider.getFactory(version).createSNMPResourceHandler(configuration);
     }
 
     /**
      * Closes the current SNMP mapping.
      */
     @OnStopped
     public void close() {
-        if (snmpResourceHandler != null) {
-            snmpResourceHandler.close();
-            snmpResourceHandler = null;
+        try {
+            if (snmpManager.getUSM() != null) {
+                snmpManager.getUSM().removeAllUsers();
+                SecurityModels.getInstance().removeSecurityModel(new 
Integer32(snmpManager.getUSM().getID()));
+            }
+            snmpManager.close();
+        } catch (IOException e) {
+            final String errorMessage = "Could not close SNMP manager.";
+            logger.error(errorMessage, e);
+            throw new ProcessException(errorMessage);
         }
     }
 
     protected void handleResponse(final ProcessContext context, final 
ProcessSession processSession, final FlowFile flowFile, final 
SNMPSingleResponse response,
-                                  final Relationship success, final 
Relationship failure, final String provenanceAddress) {
+                                  final Relationship success, final 
Relationship failure, final String provenanceAddress, final boolean 
isNewFlowFileCreated) {
         final SNMPResponseStatus snmpResponseStatus = 
processResponse(response);
         processSession.putAllAttributes(flowFile, response.getAttributes());
         if (snmpResponseStatus.getErrorStatus() == ErrorStatus.FAILURE) {
             getLogger().error("SNMP request failed, response error: " + 
snmpResponseStatus.getErrorMessage());
-            processSession.getProvenanceReporter().modifyAttributes(flowFile, 
response.getTargetAddress() + provenanceAddress);
             processSession.transfer(flowFile, failure);
             context.yield();
         } else {
             processSession.getProvenanceReporter().modifyAttributes(flowFile, 
response.getTargetAddress() + provenanceAddress);
+            if (isNewFlowFileCreated) {
+                processSession.getProvenanceReporter().fetch(flowFile, 
response.getTargetAddress() + provenanceAddress);
+            } else {
+                processSession.getProvenanceReporter().receive(flowFile, 
response.getTargetAddress() + provenanceAddress);
+            }
             processSession.transfer(flowFile, success);

Review Comment:
   ```suggestion
               if (isNewFlowFileCreated) {
                   processSession.getProvenanceReporter().receive(flowFile, 
response.getTargetAddress() + provenanceAddress);
               } else {
                   processSession.getProvenanceReporter().fetch(flowFile, 
response.getTargetAddress() + provenanceAddress);
               }
               processSession.transfer(flowFile, success);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to