Repository: nifi
Updated Branches:
  refs/heads/master 93150d3ef -> 5af6eb17b


NIFI-3162: Audit RPG and RPG port config changes.

- Added configure audits for Transport Protocol, HTTP Proxy Server Host,
  Port, User and Password in RemoteProcessGroup configuration
- Added configure audits for enabling/disabling individual remote port
- Added configure audits for Concurrent Tasks and Compressed in Remote
  Port configuration
- This closes #1476


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5af6eb17
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5af6eb17
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5af6eb17

Branch: refs/heads/master
Commit: 5af6eb17b0c1be9aca201507769f6b530d5ba0f6
Parents: 93150d3
Author: Koji Kawamura <[email protected]>
Authored: Tue Feb 7 10:11:30 2017 +0900
Committer: Matt Gilman <[email protected]>
Committed: Tue Feb 7 16:22:51 2017 -0500

----------------------------------------------------------------------
 .../nifi/audit/RemoteProcessGroupAuditor.java   | 421 +++++++++-----
 .../audit/TestRemoteProcessGroupAuditor.java    | 556 +++++++++++++++++++
 2 files changed, 830 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5af6eb17/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java
index e119437..65995f5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java
@@ -22,12 +22,13 @@ import org.apache.nifi.action.FlowChangeAction;
 import org.apache.nifi.action.Operation;
 import 
org.apache.nifi.action.component.details.FlowChangeRemoteProcessGroupDetails;
 import org.apache.nifi.action.details.ActionDetails;
+import org.apache.nifi.action.details.ConfigureDetails;
 import org.apache.nifi.action.details.FlowChangeConfigureDetails;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
+import org.apache.nifi.util.StringUtils;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
 import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
@@ -38,10 +39,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.nifi.web.api.dto.DtoFactory.SENSITIVE_VALUE_MASK;
 
 /**
  * Audits remote process group creation/removal and configuration changes.
@@ -51,6 +59,44 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
 
     private static final Logger logger = 
LoggerFactory.getLogger(RemoteProcessGroupAuditor.class);
 
+    // Proxy settings should be able to be null cleared, so the necessity of 
checking those properties depend on
+    // whether transport protocol is specified.
+    // See StandardRemoteProcessGroupDAO.updateRemoteProcessGroup for detail.
+    private static final Function<RemoteProcessGroupDTO, Boolean> 
IS_TRANSPORT_PROTOCOL_SET = dto -> dto.getTransportProtocol() != null;
+
+    private static final List<ConfigurationRecorder<RemoteProcessGroup, 
RemoteProcessGroupDTO>> CONFIG_RECORDERS = Arrays.asList(
+            new ConfigurationRecorder<RemoteProcessGroup, 
RemoteProcessGroupDTO>("Communications Timeout",
+                    dto -> dto.getCommunicationsTimeout() != null, 
RemoteProcessGroup::getCommunicationsTimeout),
+            new ConfigurationRecorder<RemoteProcessGroup, 
RemoteProcessGroupDTO>("Yield Duration",
+                    dto -> dto.getYieldDuration() != null, 
RemoteProcessGroup::getYieldDuration),
+            new ConfigurationRecorder<RemoteProcessGroup, 
RemoteProcessGroupDTO>("Transport Protocol",
+                    IS_TRANSPORT_PROTOCOL_SET, rpg -> 
rpg.getTransportProtocol().name()),
+            new ConfigurationRecorder<>("Proxy Host",
+                    IS_TRANSPORT_PROTOCOL_SET, 
RemoteProcessGroup::getProxyHost),
+            new ConfigurationRecorder<>("Proxy Port",
+                    IS_TRANSPORT_PROTOCOL_SET, 
RemoteProcessGroup::getProxyPort),
+            new ConfigurationRecorder<>("Proxy User",
+                    IS_TRANSPORT_PROTOCOL_SET, 
RemoteProcessGroup::getProxyUser),
+            new ConfigurationRecorder<>("Proxy Password",
+                    IS_TRANSPORT_PROTOCOL_SET, 
RemoteProcessGroup::getProxyPassword)
+                    .setConvertRawValue(v -> StringUtils.isEmpty(v) ? "" : 
SENSITIVE_VALUE_MASK)
+    );
+
+
+    private static final BiFunction<RemoteGroupPort, String, String> 
PORT_NAME_CONVERT = (updated, name) -> updated.getName() + "." + name;
+    private static final List<ConfigurationRecorder<RemoteGroupPort, 
RemoteProcessGroupPortDTO>> PORT_CONFIG_RECORDERS = Arrays.asList(
+            new ConfigurationRecorder<RemoteGroupPort, 
RemoteProcessGroupPortDTO>("Transmission",
+                    dto -> dto.isTransmitting() != null, 
RemoteGroupPort::isRunning)
+                    .setConvertName(PORT_NAME_CONVERT)
+                    .setConvertRawValue(v -> Boolean.valueOf(v) ? "enabled" : 
"disabled"),
+            new ConfigurationRecorder<RemoteGroupPort, 
RemoteProcessGroupPortDTO>("Concurrent Tasks",
+                    dto -> dto.getConcurrentlySchedulableTaskCount() != null, 
RemoteGroupPort::getMaxConcurrentTasks)
+                    .setConvertName(PORT_NAME_CONVERT),
+            new ConfigurationRecorder<RemoteGroupPort, 
RemoteProcessGroupPortDTO>("Compressed",
+                    dto -> dto.getUseCompression() != null, 
RemoteGroupPort::isUseCompression)
+                    .setConvertName(PORT_NAME_CONVERT)
+    );
+
     /**
      * Audits the creation of remote process groups via 
createRemoteProcessGroup().
      *
@@ -80,9 +126,112 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor 
{
     }
 
     /**
+     * Provides a higher order functions to compute Configuration audit events.
+     * @param <CMP> Class of a component, such as RemoteProcessGroup or 
RemoteGroupPort
+     * @param <DTO> Class of a DTO, such as RemoteProcessGroupDTO or 
RemoteProcessGroupPortDTO
+     */
+    private static class ConfigurationRecorder<CMP, DTO> {
+        final String name;
+        final Function<DTO, Boolean> hasInput;
+        final Function<CMP, Object> getValue;
+        private Function<String, String> convertRawValue;
+        private BiFunction<CMP, String, String> convertName;
+
+        /**
+         * Create a recorder for a configuration property.
+         * @param name name of the target property
+         * @param hasInput a function that returns whether the property is 
being updated by a request
+         * @param getValue a function that returns value of the property
+         */
+        private ConfigurationRecorder(String name, Function<DTO, Boolean> 
hasInput, Function<CMP, Object> getValue) {
+            this.name = name;
+            this.hasInput = hasInput;
+            this.getValue = getValue;
+        }
+
+        private String convertRawValue(final String value) {
+            return convertRawValue != null ? convertRawValue.apply(value) : 
value;
+        }
+
+        /**
+         * If a property value needs to be converted for audit record, e.g. 
sensitive values,
+         * use this method to specify a function to convert raw value.
+         * @param convertRawValue a function to convert string representation 
of a property value
+         * @return converted value
+         */
+        private ConfigurationRecorder<CMP, DTO> setConvertRawValue(final 
Function<String, String> convertRawValue) {
+            this.convertRawValue = convertRawValue;
+            return this;
+        }
+
+        /**
+         * If a property name needs to be decorated depends on other values in 
a context,
+         * use this method to specify a function to convert name.
+         * @param convertName a function to convert name of a property
+         * @return converted name
+         */
+        private ConfigurationRecorder<CMP, DTO> setConvertName(final 
BiFunction<CMP, String, String> convertName) {
+            this.convertName = convertName;
+            return this;
+        }
+
+        private ConfigureDetails checkConfigured(final DTO input,
+                                                 final CMP updated,
+                                                 final Object previousValue) {
+
+            final Object updatedValue = getValue.apply(updated);
+            // Convert null to empty String to avoid NullPointerException.
+            final String updatedStr = updatedValue != null ? 
updatedValue.toString() : "";
+            final String previousStr = previousValue != null ? 
previousValue.toString() : "";
+            if (hasInput.apply(input) && !updatedStr.equals(previousStr)) {
+
+                FlowChangeConfigureDetails configDetails = new 
FlowChangeConfigureDetails();
+                configDetails.setName(convertName != null ? 
convertName.apply(updated, name) : name);
+                configDetails.setPreviousValue(convertRawValue(previousStr));
+                configDetails.setValue(convertRawValue(updatedStr));
+                return configDetails;
+            }
+            return null;
+        }
+
+        /**
+         * Capture values before a component to be updated. This method should 
be called before proceeding a joint point.
+         * @param recorders list of ConfigurationRecorder
+         * @param component a target component to capture
+         * @param <C> Class of the target component
+         * @param <D> DTO class of the target component
+         * @return captured values keyed with its name
+         */
+        private static <C, D> Map<String, Object> capturePreviousValues(final 
List<ConfigurationRecorder<C, D>> recorders, final C component) {
+            final Map<String, Object> previousValues = new 
HashMap<>(recorders.size());
+            recorders.forEach(r -> previousValues.put(r.name, 
r.getValue.apply(component)));
+            return previousValues;
+        }
+
+        /**
+         * Generate ActionDetails for properties those have been updated.
+         * This method should be called after proceeding a joint point with an 
updated component.
+         * @param recorders list of ConfigurationRecorder
+         * @param dto DTO instance containing requested values
+         * @param updatedComponent a component instance that is updated by 
corresponding DAO
+         * @param previousValues previous property values before being updated
+         * @param details a Collection to accumulate action details generated
+         * @param <C> Class of the target component
+         * @param <D> DTO class of the target component
+         */
+        private static <C, D> void checkConfigured(final 
List<ConfigurationRecorder<C, D>> recorders, final D dto, final C 
updatedComponent,
+                                                   final Map<String, Object> 
previousValues, final Collection<ActionDetails> details) {
+            recorders.stream()
+                    .map(r -> r.checkConfigured(dto, updatedComponent, 
previousValues.get(r.name)))
+                    .filter(Objects::nonNull).forEach(d -> details.add(d));
+
+        }
+    }
+
+    /**
      * Audits the update of remote process group configuration.
      *
-     * @param proceedingJoinPoint join point
+     * @param proceedingJoinPoint   join point
      * @param remoteProcessGroupDTO dto
      * @param remoteProcessGroupDAO dao
      * @return group
@@ -98,18 +247,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
 
         // record the current value of this remoteProcessGroups configuration 
for comparisons later
         final boolean transmissionState = remoteProcessGroup.isTransmitting();
-        final String communicationsTimeout = 
remoteProcessGroup.getCommunicationsTimeout();
-        final String yieldDuration = remoteProcessGroup.getYieldDuration();
-        final Map<String, Integer> concurrentTasks = new HashMap<>();
-        final Map<String, Boolean> compression = new HashMap<>();
-        for (final RemoteGroupPort remotePort : 
remoteProcessGroup.getInputPorts()) {
-            concurrentTasks.put(remotePort.getIdentifier(), 
remotePort.getMaxConcurrentTasks());
-            compression.put(remotePort.getIdentifier(), 
remotePort.isUseCompression());
-        }
-        for (final RemoteGroupPort remotePort : 
remoteProcessGroup.getOutputPorts()) {
-            concurrentTasks.put(remotePort.getIdentifier(), 
remotePort.getMaxConcurrentTasks());
-            compression.put(remotePort.getIdentifier(), 
remotePort.isUseCompression());
-        }
+        final Map<String, Object> previousValues = 
ConfigurationRecorder.capturePreviousValues(CONFIG_RECORDERS, 
remoteProcessGroup);
 
         // perform the underlying operation
         final RemoteProcessGroup updatedRemoteProcessGroup = 
(RemoteProcessGroup) proceedingJoinPoint.proceed();
@@ -119,140 +257,26 @@ public class RemoteProcessGroupAuditor extends 
NiFiAuditor {
 
         // ensure the user was found
         if (user != null) {
-            Collection<ActionDetails> details = new ArrayList<>();
-
-            // see if the communications timeout has changed
-            if (remoteProcessGroupDTO.getCommunicationsTimeout() != null && 
!updatedRemoteProcessGroup.getCommunicationsTimeout().equals(communicationsTimeout))
 {
-                // create the config details
-                FlowChangeConfigureDetails configDetails = new 
FlowChangeConfigureDetails();
-                configDetails.setName("Communications Timeout");
-                
configDetails.setValue(updatedRemoteProcessGroup.getCommunicationsTimeout());
-                configDetails.setPreviousValue(communicationsTimeout);
-
-                details.add(configDetails);
-            }
-
-            // see if the yield duration has changed
-            if (remoteProcessGroupDTO.getYieldDuration() != null && 
!updatedRemoteProcessGroup.getYieldDuration().equals(yieldDuration)) {
-                // create the config details
-                FlowChangeConfigureDetails configDetails = new 
FlowChangeConfigureDetails();
-                configDetails.setName("Yield Duration");
-                
configDetails.setValue(updatedRemoteProcessGroup.getYieldDuration());
-                configDetails.setPreviousValue(yieldDuration);
+            final Collection<ActionDetails> details = new ArrayList<>();
 
-                details.add(configDetails);
-            }
+            // see if any property has changed
+            ConfigurationRecorder.checkConfigured(CONFIG_RECORDERS, 
remoteProcessGroupDTO, updatedRemoteProcessGroup, previousValues, details);
 
-            // see if the contents of this remote process group are possibly 
changing
-            if (remoteProcessGroupDTO.getContents() != null) {
-                final RemoteProcessGroupContentsDTO contents = 
remoteProcessGroupDTO.getContents();
-
-                // see if any input port configuration is changing
-                if (contents.getInputPorts() != null) {
-                    for (final RemoteProcessGroupPortDTO remotePortDTO : 
contents.getInputPorts()) {
-                        final RemoteGroupPort remotePort = 
updatedRemoteProcessGroup.getInputPort(remotePortDTO.getId());
-
-                        // if this port has been removed, ignore the 
configuration change for auditing purposes
-                        if (remotePort == null) {
-                            continue;
-                        }
-
-                        // if a new concurrent task count is specified
-                        if 
(remotePortDTO.getConcurrentlySchedulableTaskCount() != null) {
-                            // see if the concurrent tasks has changed
-                            final Integer previousConcurrentTasks = 
concurrentTasks.get(remotePortDTO.getId());
-                            if (previousConcurrentTasks != null && 
remotePort.getMaxConcurrentTasks() != previousConcurrentTasks) {
-                                // create the config details
-                                FlowChangeConfigureDetails 
concurrentTasksDetails = new FlowChangeConfigureDetails();
-                                concurrentTasksDetails.setName("Concurrent 
Tasks");
-                                
concurrentTasksDetails.setValue(String.valueOf(remotePort.getMaxConcurrentTasks()));
-                                
concurrentTasksDetails.setPreviousValue(String.valueOf(previousConcurrentTasks));
-
-                                details.add(concurrentTasksDetails);
-                            }
-                        }
-
-                        // if a new compressed flag is specified
-                        if (remotePortDTO.getUseCompression() != null) {
-                            // see if the compression has changed
-                            final Boolean previousCompression = 
compression.get(remotePortDTO.getId());
-                            if (previousCompression != null && 
remotePort.isUseCompression() != previousCompression) {
-                                // create the config details
-                                FlowChangeConfigureDetails compressionDetails 
= new FlowChangeConfigureDetails();
-                                compressionDetails.setName("Compressed");
-                                
compressionDetails.setValue(String.valueOf(remotePort.isUseCompression()));
-                                
compressionDetails.setPreviousValue(String.valueOf(previousCompression));
-
-                                details.add(compressionDetails);
-                            }
-                        }
-                    }
-                }
-
-                // see if any output port configuration is changing
-                if (contents.getOutputPorts() != null) {
-                    for (final RemoteProcessGroupPortDTO remotePortDTO : 
contents.getOutputPorts()) {
-                        final RemoteGroupPort remotePort = 
updatedRemoteProcessGroup.getOutputPort(remotePortDTO.getId());
-
-                        // if this port has been removed, ignore the 
configuration change for auditing purposes
-                        if (remotePort == null) {
-                            continue;
-                        }
-
-                        // if a new concurrent task count is specified
-                        if 
(remotePortDTO.getConcurrentlySchedulableTaskCount() != null) {
-                            // see if the concurrent tasks has changed
-                            final Integer previousConcurrentTasks = 
concurrentTasks.get(remotePortDTO.getId());
-                            if (previousConcurrentTasks != null && 
remotePort.getMaxConcurrentTasks() != previousConcurrentTasks) {
-                                // create the config details
-                                FlowChangeConfigureDetails 
concurrentTasksDetails = new FlowChangeConfigureDetails();
-                                concurrentTasksDetails.setName("Concurrent 
Tasks");
-                                
concurrentTasksDetails.setValue(String.valueOf(remotePort.getMaxConcurrentTasks()));
-                                
concurrentTasksDetails.setPreviousValue(String.valueOf(previousConcurrentTasks));
-
-                                details.add(concurrentTasksDetails);
-                            }
-                        }
-
-                        // if a new compressed flag is specified
-                        if (remotePortDTO.getUseCompression() != null) {
-                            // see if the compression has changed
-                            final Boolean previousCompression = 
compression.get(remotePortDTO.getId());
-                            if (previousCompression != null && 
remotePort.isUseCompression() != previousCompression) {
-                                // create the config details
-                                FlowChangeConfigureDetails compressionDetails 
= new FlowChangeConfigureDetails();
-                                compressionDetails.setName("Compressed");
-                                
compressionDetails.setValue(String.valueOf(remotePort.isUseCompression()));
-                                
compressionDetails.setPreviousValue(String.valueOf(previousCompression));
-
-                                details.add(compressionDetails);
-                            }
-                        }
-                    }
-                }
-            }
-
-            Collection<Action> actions = new ArrayList<>();
+            final Date timestamp = new Date();
+            final Collection<Action> actions = new ArrayList<>();
 
             // create the remote process group details
-            FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = 
new FlowChangeRemoteProcessGroupDetails();
-            
remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri());
+            final FlowChangeRemoteProcessGroupDetails 
remoteProcessGroupDetails = createFlowChangeDetails(remoteProcessGroup);
 
             // save the actions if necessary
             if (!details.isEmpty()) {
-                Date timestamp = new Date();
 
                 // create the actions
                 for (ActionDetails detail : details) {
-                    // create the port action for updating the name
-                    FlowChangeAction remoteProcessGroupAction = new 
FlowChangeAction();
-                    
remoteProcessGroupAction.setUserIdentity(user.getIdentity());
+                    // create a configure action for each updated property
+                    FlowChangeAction remoteProcessGroupAction = 
createFlowChangeAction(user, timestamp,
+                            updatedRemoteProcessGroup, 
remoteProcessGroupDetails);
                     remoteProcessGroupAction.setOperation(Operation.Configure);
-                    remoteProcessGroupAction.setTimestamp(timestamp);
-                    
remoteProcessGroupAction.setSourceId(updatedRemoteProcessGroup.getIdentifier());
-                    
remoteProcessGroupAction.setSourceName(updatedRemoteProcessGroup.getName());
-                    
remoteProcessGroupAction.setSourceType(Component.RemoteProcessGroup);
-                    
remoteProcessGroupAction.setComponentDetails(remoteProcessGroupDetails);
                     remoteProcessGroupAction.setActionDetails(detail);
 
                     actions.add(remoteProcessGroupAction);
@@ -264,14 +288,9 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor 
{
 
             // determine if the running state has changed
             if (transmissionState != updatedTransmissionState) {
-                // create a processor action
-                FlowChangeAction remoteProcessGroupAction = new 
FlowChangeAction();
-                remoteProcessGroupAction.setUserIdentity(user.getIdentity());
-                remoteProcessGroupAction.setTimestamp(new Date());
-                
remoteProcessGroupAction.setSourceId(updatedRemoteProcessGroup.getIdentifier());
-                
remoteProcessGroupAction.setSourceName(updatedRemoteProcessGroup.getName());
-                
remoteProcessGroupAction.setSourceType(Component.RemoteProcessGroup);
-                
remoteProcessGroupAction.setComponentDetails(remoteProcessGroupDetails);
+                // create a remote process group action
+                FlowChangeAction remoteProcessGroupAction = 
createFlowChangeAction(user, timestamp,
+                        updatedRemoteProcessGroup, remoteProcessGroupDetails);
 
                 // set the operation accordingly
                 if (updatedTransmissionState) {
@@ -292,6 +311,20 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor 
{
         return updatedRemoteProcessGroup;
     }
 
+    private FlowChangeAction createFlowChangeAction(final NiFiUser user, final 
Date timestamp,
+                                                    final RemoteProcessGroup 
remoteProcessGroup,
+                                                    final 
FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails) {
+
+        FlowChangeAction remoteProcessGroupAction = new FlowChangeAction();
+        remoteProcessGroupAction.setUserIdentity(user.getIdentity());
+        remoteProcessGroupAction.setTimestamp(timestamp);
+        
remoteProcessGroupAction.setSourceId(remoteProcessGroup.getIdentifier());
+        remoteProcessGroupAction.setSourceName(remoteProcessGroup.getName());
+        remoteProcessGroupAction.setSourceType(Component.RemoteProcessGroup);
+        
remoteProcessGroupAction.setComponentDetails(remoteProcessGroupDetails);
+        return remoteProcessGroupAction;
+    }
+
     /**
      * Audits the removal of a process group via deleteProcessGroup().
      *
@@ -320,6 +353,101 @@ public class RemoteProcessGroupAuditor extends 
NiFiAuditor {
         }
     }
 
+    private RemoteGroupPort 
auditUpdateProcessGroupPortConfiguration(ProceedingJoinPoint 
proceedingJoinPoint, RemoteProcessGroupPortDTO remoteProcessGroupPortDto, 
RemoteProcessGroup remoteProcessGroup, RemoteGroupPort remoteProcessGroupPort) 
throws Throwable {
+        final Map<String, Object> previousValues = 
ConfigurationRecorder.capturePreviousValues(PORT_CONFIG_RECORDERS, 
remoteProcessGroupPort);
+
+        // perform the underlying operation
+        final RemoteGroupPort updatedRemoteProcessGroupPort = 
(RemoteGroupPort) proceedingJoinPoint.proceed();
+
+        // get the current user
+        NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+        if (user != null) {
+            final Collection<ActionDetails> details = new ArrayList<>();
+
+            // see if any property has changed
+            ConfigurationRecorder.checkConfigured(PORT_CONFIG_RECORDERS, 
remoteProcessGroupPortDto, updatedRemoteProcessGroupPort, previousValues, 
details);
+
+            final Date timestamp = new Date();
+            final Collection<Action> actions = new ArrayList<>();
+
+            // create the remote process group details
+            final FlowChangeRemoteProcessGroupDetails 
remoteProcessGroupDetails = createFlowChangeDetails(remoteProcessGroup);
+
+            // save the actions if necessary
+            for (ActionDetails detail : details) {
+                // create a configure action for each updated property
+                FlowChangeAction remoteProcessGroupAction = 
createFlowChangeAction(user, timestamp,
+                        remoteProcessGroup, remoteProcessGroupDetails);
+                remoteProcessGroupAction.setOperation(Operation.Configure);
+                remoteProcessGroupAction.setActionDetails(detail);
+
+                actions.add(remoteProcessGroupAction);
+            }
+
+            // ensure there are actions to record
+            if (!actions.isEmpty()) {
+                // save the actions
+                saveActions(actions, logger);
+            }
+        }
+
+        return updatedRemoteProcessGroupPort;
+    }
+
+    /**
+     * Audits the update of remote process group input port configuration.
+     *
+     * @param proceedingJoinPoint       join point
+     * @param remoteProcessGroupPortDto dto
+     * @param remoteProcessGroupDAO     dao
+     * @return group
+     * @throws Throwable ex
+     */
+    @Around("within(org.apache.nifi.web.dao.RemoteProcessGroupDAO+) && "
+            + "execution(org.apache.nifi.remote.RemoteGroupPort 
updateRemoteProcessGroupInputPort(java.lang.String, 
org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO)) && "
+            + "args(remoteProcessGroupId, remoteProcessGroupPortDto) && "
+            + "target(remoteProcessGroupDAO)")
+    public RemoteGroupPort auditUpdateProcessGroupInputPortConfiguration(
+            ProceedingJoinPoint proceedingJoinPoint, String 
remoteProcessGroupId,
+            RemoteProcessGroupPortDTO remoteProcessGroupPortDto, 
RemoteProcessGroupDAO remoteProcessGroupDAO) throws Throwable {
+
+        final RemoteProcessGroup remoteProcessGroup = 
remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
+        final RemoteGroupPort remoteProcessGroupPort = 
remoteProcessGroup.getInputPort(remoteProcessGroupPortDto.getId());
+
+        return auditUpdateProcessGroupPortConfiguration(proceedingJoinPoint, 
remoteProcessGroupPortDto, remoteProcessGroup, remoteProcessGroupPort);
+    }
+
+    /**
+     * Audits the update of remote process group output port configuration.
+     *
+     * @param proceedingJoinPoint join point
+     * @param remoteProcessGroupPortDto dto
+     * @param remoteProcessGroupDAO dao
+     * @return group
+     * @throws Throwable ex
+     */
+    @Around("within(org.apache.nifi.web.dao.RemoteProcessGroupDAO+) && "
+            + "execution(org.apache.nifi.remote.RemoteGroupPort 
updateRemoteProcessGroupOutputPort(java.lang.String, 
org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO)) && "
+            + "args(remoteProcessGroupId, remoteProcessGroupPortDto) && "
+            + "target(remoteProcessGroupDAO)")
+    public RemoteGroupPort auditUpdateProcessGroupOutputPortConfiguration(
+            ProceedingJoinPoint proceedingJoinPoint, String 
remoteProcessGroupId,
+            RemoteProcessGroupPortDTO remoteProcessGroupPortDto, 
RemoteProcessGroupDAO remoteProcessGroupDAO) throws Throwable {
+
+        final RemoteProcessGroup remoteProcessGroup = 
remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
+        final RemoteGroupPort remoteProcessGroupPort = 
remoteProcessGroup.getOutputPort(remoteProcessGroupPortDto.getId());
+
+        return auditUpdateProcessGroupPortConfiguration(proceedingJoinPoint, 
remoteProcessGroupPortDto, remoteProcessGroup, remoteProcessGroupPort);
+    }
+
+    private FlowChangeRemoteProcessGroupDetails 
createFlowChangeDetails(RemoteProcessGroup remoteProcessGroup) {
+        FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = new 
FlowChangeRemoteProcessGroupDetails();
+        remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri());
+        return remoteProcessGroupDetails;
+    }
+
+
     /**
      * Generates an audit record for the specified remote process group.
      *
@@ -348,8 +476,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
         // ensure the user was found
         if (user != null) {
             // create the remote process group details
-            FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = 
new FlowChangeRemoteProcessGroupDetails();
-            
remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri().toString());
+            FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = 
createFlowChangeDetails(remoteProcessGroup);
 
             // create the remote process group action
             action = new FlowChangeAction();

http://git-wip-us.apache.org/repos/asf/nifi/blob/5af6eb17/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java
new file mode 100644
index 0000000..725e4d4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.audit;
+
+import org.apache.nifi.action.Action;
+import org.apache.nifi.action.Operation;
+import org.apache.nifi.action.component.details.RemoteProcessGroupDetails;
+import org.apache.nifi.action.details.ActionDetails;
+import org.apache.nifi.action.details.ConfigureDetails;
+import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.NiFiUserDetails;
+import org.apache.nifi.authorization.user.StandardNiFiUser;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContext;
+import org.springframework.security.core.context.SecurityContextHolder;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.nifi.web.api.dto.DtoFactory.SENSITIVE_VALUE_MASK;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestRemoteProcessGroupAuditor {
+
+    @Before
+    public void setup() {
+
+        final SecurityContext securityContext = 
SecurityContextHolder.getContext();
+        final Authentication authentication = mock(Authentication.class);
+        securityContext.setAuthentication(authentication);
+        final NiFiUser user = new StandardNiFiUser("user-id");
+        final NiFiUserDetails userDetail = new NiFiUserDetails(user);
+        when(authentication.getPrincipal()).thenReturn(userDetail);
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private Collection<Action> 
updateProcessGroupConfiguration(RemoteProcessGroupDTO inputRPGDTO, 
RemoteProcessGroup existingRPG) throws Throwable {
+        final RemoteProcessGroupAuditor auditor = new 
RemoteProcessGroupAuditor();
+        final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
+        final String remoteProcessGroupId = "remote-process-group-id";
+        inputRPGDTO.setId(remoteProcessGroupId);
+
+        final String targetUrl = "http://localhost:8080/nifi";;
+        when(existingRPG.getTargetUri()).thenReturn(targetUrl);
+
+        final RemoteProcessGroupDAO remoteProcessGroupDAO = 
mock(RemoteProcessGroupDAO.class);
+        when(remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId))
+                .thenReturn(existingRPG);
+
+        // Setup updatedRPG mock based on inputRPGDTO.
+        final RemoteProcessGroup updatedRPG = mock(RemoteProcessGroup.class);
+        when(updatedRPG.getIdentifier()).thenReturn(remoteProcessGroupId);
+        
when(updatedRPG.isTransmitting()).thenReturn(inputRPGDTO.isTransmitting());
+        
when(updatedRPG.getCommunicationsTimeout()).thenReturn(inputRPGDTO.getCommunicationsTimeout());
+        
when(updatedRPG.getYieldDuration()).thenReturn(inputRPGDTO.getYieldDuration());
+        when(updatedRPG.getTransportProtocol())
+                
.thenReturn(SiteToSiteTransportProtocol.valueOf(inputRPGDTO.getTransportProtocol()));
+        when(updatedRPG.getProxyHost()).thenReturn(inputRPGDTO.getProxyHost());
+        when(updatedRPG.getProxyPort()).thenReturn(inputRPGDTO.getProxyPort());
+        when(updatedRPG.getProxyUser()).thenReturn(inputRPGDTO.getProxyUser());
+        
when(updatedRPG.getProxyPassword()).thenReturn(inputRPGDTO.getProxyPassword());
+
+        when(joinPoint.proceed()).thenReturn(updatedRPG);
+
+        // Capture added actions so that those can be asserted later.
+        final AuditService auditService = mock(AuditService.class);
+        final AtomicReference<Collection<Action>> addedActions = new 
AtomicReference<>();
+        doAnswer(invocation -> {
+            Collection<Action> actions = invocation.getArgumentAt(0, 
Collection.class);
+            addedActions.set(actions);
+            return null;
+        }).when(auditService).addActions(any());
+
+        auditor.setAuditService(auditService);
+
+        auditor.auditUpdateProcessGroupConfiguration(joinPoint, inputRPGDTO, 
remoteProcessGroupDAO);
+
+
+        final Collection<Action> actions = addedActions.get();
+
+        // Assert common action values.
+        if (actions != null) {
+            actions.forEach(action -> {
+                assertEquals(remoteProcessGroupId, action.getSourceId());
+                assertEquals("user-id", action.getUserIdentity());
+                assertEquals(targetUrl, 
((RemoteProcessGroupDetails)action.getComponentDetails()).getUri());
+                assertNotNull(action.getTimestamp());
+            });
+        }
+
+        return actions;
+    }
+
+    private RemoteProcessGroup defaultRemoteProcessGroup() {
+        final RemoteProcessGroup existingRPG = mock(RemoteProcessGroup.class);
+        
when(existingRPG.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
+        when(existingRPG.isTransmitting()).thenReturn(false);
+        when(existingRPG.getProxyPort()).thenReturn(null);
+        return existingRPG;
+    }
+
+    private RemoteProcessGroupDTO defaultInput() {
+        final RemoteProcessGroupDTO inputRPGDTO = new RemoteProcessGroupDTO();
+        inputRPGDTO.setTransportProtocol("RAW");
+        inputRPGDTO.setTransmitting(false);
+        return inputRPGDTO;
+    }
+
+    @Test
+    public void testEnableTransmission() throws Throwable {
+
+        final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
+        when(existingRPG.isTransmitting()).thenReturn(false);
+
+        final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
+        inputRPGDTO.setTransmitting(true);
+
+        final Collection<Action> actions = 
updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Start, action.getOperation());
+        assertNull(action.getActionDetails());
+
+    }
+
+    @Test
+    public void testDisableTransmission() throws Throwable {
+
+        final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
+        when(existingRPG.isTransmitting()).thenReturn(true);
+
+        final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
+        inputRPGDTO.setTransmitting(false);
+
+        final Collection<Action> actions = 
updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Stop, action.getOperation());
+        assertNull(action.getActionDetails());
+
+    }
+
+    @Test
+    public void testConfigureCommunicationsTimeout() throws Throwable {
+
+        final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
+        when(existingRPG.getCommunicationsTimeout()).thenReturn("30 sec");
+
+        final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
+        inputRPGDTO.setCommunicationsTimeout("31 sec");
+
+        final Collection<Action> actions = 
updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), "Communications 
Timeout",
+                existingRPG.getCommunicationsTimeout(), 
inputRPGDTO.getCommunicationsTimeout());
+
+    }
+
+    private void assertConfigureDetails(final ActionDetails actionDetails, 
final String name,
+                                        final Object previousValue, final 
Object value) {
+        final ConfigureDetails configureDetails = (ConfigureDetails) 
actionDetails;
+        assertEquals(name, configureDetails.getName());
+        assertEquals(previousValue != null ? previousValue.toString() : "", 
configureDetails.getPreviousValue());
+        assertEquals(value != null ? value.toString() : "", 
configureDetails.getValue());
+    }
+
+    @Test
+    public void testConfigureYieldDuration() throws Throwable {
+
+        final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
+        when(existingRPG.getYieldDuration()).thenReturn("10 sec");
+
+        final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
+        inputRPGDTO.setYieldDuration("11 sec");
+
+        final Collection<Action> actions = 
updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), "Yield Duration",
+                existingRPG.getYieldDuration(), 
inputRPGDTO.getYieldDuration());
+
+    }
+
+    @Test
+    public void testConfigureTransportProtocol() throws Throwable {
+
+        final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
+        
when(existingRPG.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
+
+        final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
+        inputRPGDTO.setTransportProtocol("HTTP");
+
+        final Collection<Action> actions = 
updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), "Transport Protocol",
+                existingRPG.getTransportProtocol().name(), 
inputRPGDTO.getTransportProtocol());
+
+    }
+
+    @Test
+    public void testConfigureProxyHost() throws Throwable {
+
+        final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
+
+        final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
+        inputRPGDTO.setProxyHost("proxy.example.com");
+
+        final Collection<Action> actions = 
updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), "Proxy Host",
+                existingRPG.getProxyHost(), inputRPGDTO.getProxyHost());
+
+    }
+
+    @Test
+    public void testConfigureProxyHostUpdate() throws Throwable {
+
+        final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
+        when(existingRPG.getProxyHost()).thenReturn("proxy1.example.com");
+
+        final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
+        inputRPGDTO.setProxyHost("proxy2.example.com");
+
+        final Collection<Action> actions = 
updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), "Proxy Host",
+                existingRPG.getProxyHost(), inputRPGDTO.getProxyHost());
+
+    }
+
+    @Test
+    public void testConfigureProxyHostClear() throws Throwable {
+
+        final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
+        when(existingRPG.getProxyHost()).thenReturn("proxy.example.com");
+
+        final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
+        inputRPGDTO.setProxyHost("");
+
+        final Collection<Action> actions = 
updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), "Proxy Host",
+                existingRPG.getProxyHost(), inputRPGDTO.getProxyHost());
+
+    }
+
+    @Test
+    public void testConfigureProxyPort() throws Throwable {
+
+        final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
+
+        final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
+        inputRPGDTO.setProxyPort(3128);
+
+        final Collection<Action> actions = 
updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), "Proxy Port",
+                existingRPG.getProxyPort(), inputRPGDTO.getProxyPort());
+
+    }
+
+
+    @Test
+    public void testConfigureProxyPortClear() throws Throwable {
+
+        final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
+        when(existingRPG.getProxyPort()).thenReturn(3128);
+
+        final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
+        inputRPGDTO.setProxyPort(null);
+
+        final Collection<Action> actions = 
updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), "Proxy Port",
+                existingRPG.getProxyPort(), inputRPGDTO.getProxyPort());
+
+    }
+
+    @Test
+    public void testConfigureProxyUser() throws Throwable {
+
+        final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
+
+        final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
+        inputRPGDTO.setProxyUser("proxy-user");
+
+        final Collection<Action> actions = 
updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), "Proxy User",
+                existingRPG.getProxyUser(), inputRPGDTO.getProxyUser());
+
+    }
+
+    @Test
+    public void testConfigureProxyUserClear() throws Throwable {
+
+        final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
+        when(existingRPG.getProxyUser()).thenReturn("proxy-user");
+
+        final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
+        inputRPGDTO.setProxyUser(null);
+
+        final Collection<Action> actions = 
updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), "Proxy User",
+                existingRPG.getProxyUser(), inputRPGDTO.getProxyUser());
+
+    }
+
+    @Test
+    public void testConfigureProxyPassword() throws Throwable {
+
+        final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
+
+        final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
+        inputRPGDTO.setProxyPassword("proxy-password");
+
+        final Collection<Action> actions = 
updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), "Proxy Password","", 
SENSITIVE_VALUE_MASK);
+
+    }
+
+    @Test
+    public void testConfigureProxyPasswordClear() throws Throwable {
+
+        final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
+        when(existingRPG.getProxyPassword()).thenReturn("proxy-password");
+
+        final RemoteProcessGroupDTO inputRPGDTO = defaultInput();
+        inputRPGDTO.setProxyPassword(null);
+
+        final Collection<Action> actions = 
updateProcessGroupConfiguration(inputRPGDTO, existingRPG);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), "Proxy Password", 
SENSITIVE_VALUE_MASK, "");
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private Collection<Action> 
updateProcessGroupInputPortConfiguration(RemoteProcessGroupPortDTO 
inputRPGPortDTO, RemoteGroupPort existingRPGPort) throws Throwable {
+        final RemoteProcessGroup existingRPG = defaultRemoteProcessGroup();
+        final RemoteProcessGroupAuditor auditor = new 
RemoteProcessGroupAuditor();
+        final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
+        final String remoteProcessGroupId = "remote-process-group-id";
+        inputRPGPortDTO.setId(remoteProcessGroupId);
+
+        final String targetUrl = "http://localhost:8080/nifi";;
+        when(existingRPG.getIdentifier()).thenReturn(remoteProcessGroupId);
+        when(existingRPG.getTargetUri()).thenReturn(targetUrl);
+
+        final RemoteProcessGroupDAO remoteProcessGroupDAO = 
mock(RemoteProcessGroupDAO.class);
+        when(remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId))
+                .thenReturn(existingRPG);
+
+        
when(existingRPG.getInputPort(eq(inputRPGPortDTO.getId()))).thenReturn(existingRPGPort);
+
+        // Setup updatedRPGPort mock based on inputRPGPortDTO.
+        final RemoteGroupPort updatedRPGPort = mock(RemoteGroupPort.class);
+        final String portName = existingRPGPort.getName();
+        when(updatedRPGPort.getName()).thenReturn(portName);
+        if (inputRPGPortDTO.isTransmitting() != null) {
+            
when(updatedRPGPort.isRunning()).thenReturn(inputRPGPortDTO.isTransmitting());
+        }
+        
when(updatedRPGPort.getMaxConcurrentTasks()).thenReturn(inputRPGPortDTO.getConcurrentlySchedulableTaskCount());
+        
when(updatedRPGPort.isUseCompression()).thenReturn(inputRPGPortDTO.getUseCompression());
+
+        when(joinPoint.proceed()).thenReturn(updatedRPGPort);
+
+        // Capture added actions so that those can be asserted later.
+        final AuditService auditService = mock(AuditService.class);
+        final AtomicReference<Collection<Action>> addedActions = new 
AtomicReference<>();
+        doAnswer(invocation -> {
+            Collection<Action> actions = invocation.getArgumentAt(0, 
Collection.class);
+            addedActions.set(actions);
+            return null;
+        }).when(auditService).addActions(any());
+
+        auditor.setAuditService(auditService);
+
+        auditor.auditUpdateProcessGroupInputPortConfiguration(joinPoint, 
remoteProcessGroupId, inputRPGPortDTO, remoteProcessGroupDAO);
+
+
+        final Collection<Action> actions = addedActions.get();
+
+        // Assert common action values.
+        if (actions != null) {
+            actions.forEach(action -> {
+                assertEquals(remoteProcessGroupId, action.getSourceId());
+                assertEquals("user-id", action.getUserIdentity());
+                assertEquals(targetUrl, 
((RemoteProcessGroupDetails)action.getComponentDetails()).getUri());
+                assertNotNull(action.getTimestamp());
+            });
+        }
+
+        return actions;
+    }
+
+    private RemoteGroupPort defaultRemoteGroupPort() {
+        final RemoteGroupPort existingRPGPort = mock(RemoteGroupPort.class);
+        when(existingRPGPort.isRunning()).thenReturn(false);
+        when(existingRPGPort.getMaxConcurrentTasks()).thenReturn(1);
+        when(existingRPGPort.isUseCompression()).thenReturn(false);
+        return existingRPGPort;
+    }
+
+    private RemoteProcessGroupPortDTO defaultRemoteProcessGroupPortDTO() {
+        final RemoteProcessGroupPortDTO inputRPGPortDTO = new 
RemoteProcessGroupPortDTO();
+        inputRPGPortDTO.setConcurrentlySchedulableTaskCount(1);
+        inputRPGPortDTO.setUseCompression(false);
+        return inputRPGPortDTO;
+    }
+
+    @Test
+    public void testEnablePort() throws Throwable {
+
+        final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort();
+        when(existingRPGPort.getName()).thenReturn("input-port-1");
+        when(existingRPGPort.isRunning()).thenReturn(false);
+
+        final RemoteProcessGroupPortDTO inputRPGPortDTO = 
defaultRemoteProcessGroupPortDTO();
+        inputRPGPortDTO.setTransmitting(true);
+
+        final Collection<Action> actions = 
updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), 
"input-port-1.Transmission", "disabled", "enabled");
+
+    }
+
+    @Test
+    public void testDisablePort() throws Throwable {
+
+        final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort();
+        when(existingRPGPort.getName()).thenReturn("input-port-1");
+        when(existingRPGPort.isRunning()).thenReturn(true);
+
+        final RemoteProcessGroupPortDTO inputRPGPortDTO = 
defaultRemoteProcessGroupPortDTO();
+        inputRPGPortDTO.setTransmitting(false);
+
+        final Collection<Action> actions = 
updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), 
"input-port-1.Transmission", "enabled", "disabled");
+
+    }
+
+    @Test
+    public void testConfigurePortConcurrency() throws Throwable {
+
+        final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort();
+        when(existingRPGPort.getName()).thenReturn("input-port-1");
+
+        final RemoteProcessGroupPortDTO inputRPGPortDTO = 
defaultRemoteProcessGroupPortDTO();
+        inputRPGPortDTO.setConcurrentlySchedulableTaskCount(2);
+
+        final Collection<Action> actions = 
updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), 
"input-port-1.Concurrent Tasks", "1", "2");
+
+    }
+
+    @Test
+    public void testConfigurePortCompression() throws Throwable {
+
+        final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort();
+        when(existingRPGPort.getName()).thenReturn("input-port-1");
+
+        final RemoteProcessGroupPortDTO inputRPGPortDTO = 
defaultRemoteProcessGroupPortDTO();
+        inputRPGPortDTO.setUseCompression(true);
+
+        final Collection<Action> actions = 
updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort);
+
+        assertEquals(1, actions.size());
+        final Action action = actions.iterator().next();
+        assertEquals(Operation.Configure, action.getOperation());
+        assertConfigureDetails(action.getActionDetails(), 
"input-port-1.Compressed", "false", "true");
+
+    }
+}

Reply via email to