This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new df90c65  NIFI-6735 - validate components before restarting processors 
following parameter context update.
df90c65 is described below

commit df90c6524695ea10d76d58873dfdf4af4b206a96
Author: Rob Fellows <[email protected]>
AuthorDate: Tue Oct 1 15:38:53 2019 -0400

    NIFI-6735 - validate components before restarting processors following 
parameter context update.
    
    This closes #3782.
    
    Signed-off-by: Mark Payne <[email protected]>
---
 .../nifi/web/api/ParameterContextResource.java     |   7 +-
 .../util/ClusterReplicationComponentLifecycle.java | 179 ++++++++++++++++++---
 .../nifi/web/util/LocalComponentLifecycle.java     | 114 ++++++++++---
 3 files changed, 259 insertions(+), 41 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
index 5c8fd6a..20fcad4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
@@ -865,6 +865,7 @@ public class ParameterContextResource extends 
ApplicationResource {
         final ParameterContextEntity updatedEntity;
         try {
             updatedEntity = performParameterContextUpdate(asyncRequest, uri, 
replicateRequest, revision, updatedContextEntity);
+            asyncRequest.markStepComplete();
             logger.info("Successfully updated Parameter Context with ID {}", 
updatedContextEntity.getId());
         } finally {
             // TODO: can almost certainly be refactored so that the same code 
is shared between VersionsResource and ParameterContextResource.
@@ -968,8 +969,6 @@ public class ParameterContextResource extends 
ApplicationResource {
             logger.info("Restarting {} Processors after having updated 
Parameter Context", processors.size());
         }
 
-        asyncRequest.markStepComplete();
-
         // Step 14. Restart all components
         final Set<AffectedComponentEntity> componentsToStart = 
getUpdatedEntities(processors);
 
@@ -978,6 +977,7 @@ public class ParameterContextResource extends 
ApplicationResource {
 
         try {
             componentLifecycle.scheduleComponents(uri, "root", 
componentsToStart, ScheduledState.RUNNING, startComponentsPause, 
InvalidComponentAction.SKIP);
+            asyncRequest.markStepComplete();
         } catch (final IllegalStateException ise) {
             // Component Lifecycle will restart the Processors only if they 
are valid. If IllegalStateException gets thrown, we need to provide
             // a more intelligent error message as to exactly what happened, 
rather than indicate that the flow could not be updated.
@@ -1003,8 +1003,6 @@ public class ParameterContextResource extends 
ApplicationResource {
             logger.info("Re-Enabling {} Controller Services after having 
updated Parameter Context", controllerServices.size());
         }
 
-        asyncRequest.markStepComplete();
-
         // Step 13. Re-enable all disabled controller services
         final CancellableTimedPause enableServicesPause = new 
CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
         asyncRequest.setCancelCallback(enableServicesPause::cancel);
@@ -1012,6 +1010,7 @@ public class ParameterContextResource extends 
ApplicationResource {
 
         try {
             componentLifecycle.activateControllerServices(uri, "root", 
servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause, 
InvalidComponentAction.SKIP);
+            asyncRequest.markStepComplete();
         } catch (final IllegalStateException ise) {
             // Component Lifecycle will re-enable the Controller Services only 
if they are valid. If IllegalStateException gets thrown, we need to provide
             // a more intelligent error message as to exactly what happened, 
rather than indicate that the Parameter Context could not be updated.
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java
index 934e927..2813daf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java
@@ -101,6 +101,16 @@ public class ClusterReplicationComponentLifecycle 
implements ComponentLifecycle
 
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
 
+        // If attempting to run the processors, validation must complete first
+        if (desiredState == ScheduledState.RUNNING) {
+            try {
+                waitForProcessorValidation(user, exampleUri, groupId, 
componentMap, pause);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new LifecycleManagementException("Interrupted while 
waiting for processors to complete validation");
+            }
+        }
+
         // Determine whether we should replicate only to the cluster 
coordinator, or if we should replicate directly to the cluster nodes themselves.
         try {
             final NodeResponse clusterResponse;
@@ -156,6 +166,64 @@ public class ClusterReplicationComponentLifecycle 
implements ComponentLifecycle
         return 
processorRevisions.stream().collect(Collectors.toMap(Revision::getComponentId, 
Function.identity()));
     }
 
+    private boolean waitForProcessorValidation(final NiFiUser user, final URI 
originalUri, final String groupId,
+                                               final Map<String, 
AffectedComponentEntity> processors, final Pause pause) throws 
InterruptedException {
+        URI groupUri;
+        try {
+            groupUri = new URI(originalUri.getScheme(), 
originalUri.getUserInfo(), originalUri.getHost(),
+                    originalUri.getPort(), "/nifi-api/process-groups/" + 
groupId + "/processors", "includeDescendantGroups=true", 
originalUri.getFragment());
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+
+        final Map<String, String> headers = new HashMap<>();
+        final MultivaluedMap<String, String> requestEntity = new 
MultivaluedHashMap<>();
+
+        boolean continuePolling = true;
+        while (continuePolling) {
+
+            // Determine whether we should replicate only to the cluster 
coordinator, or if we should replicate directly to the cluster nodes themselves.
+            final NodeResponse clusterResponse;
+            if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                clusterResponse = getRequestReplicator().replicate(user, 
HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
+            } else {
+                clusterResponse = getRequestReplicator().forwardToCoordinator(
+                        getClusterCoordinatorNode(), user, HttpMethod.GET, 
groupUri, requestEntity, headers).awaitMergedResponse();
+            }
+
+            if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
+                return false;
+            }
+
+            final ProcessorsEntity processorsEntity = 
getResponseEntity(clusterResponse, ProcessorsEntity.class);
+            final Set<ProcessorEntity> processorEntities = 
processorsEntity.getProcessors();
+
+            if (isProcessorValidationComplete(processorEntities, processors)) {
+                logger.debug("All {} processors of interest now have been 
validated", processors.size());
+                return true;
+            }
+
+            // Not all of the processors are done validating. Pause for a bit 
and poll again.
+            continuePolling = pause.pause();
+        }
+
+        return false;
+    }
+
+    private boolean isProcessorValidationComplete(Set<ProcessorEntity> 
processorEntities, Map<String, AffectedComponentEntity> affectedComponents) {
+        updateAffectedProcessors(processorEntities, affectedComponents);
+        for (final ProcessorEntity entity : processorEntities) {
+            if (!affectedComponents.containsKey(entity.getId())) {
+                continue;
+            }
+
+            if 
(ProcessorDTO.VALIDATING.equals(entity.getComponent().getValidationStatus())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     /**
      * Periodically polls the process group with the given ID, waiting for all 
processors whose ID's are given to have the given Scheduled State.
      *
@@ -230,30 +298,32 @@ public class ClusterReplicationComponentLifecycle 
implements ComponentLifecycle
         return entity;
     }
 
+    private void updateAffectedProcessors(final Set<ProcessorEntity> 
processorEntities, final Map<String, AffectedComponentEntity> 
affectedComponents) {
+        // update the affected processors
+        processorEntities.stream()
+                .filter(entity -> 
affectedComponents.containsKey(entity.getId()))
+                .forEach(entity -> {
+                    final AffectedComponentEntity affectedComponentEntity = 
affectedComponents.get(entity.getId());
+                    affectedComponentEntity.setRevision(entity.getRevision());
+
+                    // only consider update this component if the user had 
permissions to it
+                    if 
(Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
+                        final AffectedComponentDTO affectedComponent = 
affectedComponentEntity.getComponent();
+                        
affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus());
+                        
affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount());
+
+                        if 
(Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
+                            
affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
+                        }
+                    }
+                });
+    }
 
     private boolean isProcessorActionComplete(final Set<ProcessorEntity> 
processorEntities, final Map<String, AffectedComponentEntity> 
affectedComponents, final ScheduledState desiredState,
                                               final InvalidComponentAction 
invalidComponentAction) throws LifecycleManagementException {
-
         final String desiredStateName = desiredState.name();
 
-        // update the affected processors
-        processorEntities.stream()
-            .filter(entity -> affectedComponents.containsKey(entity.getId()))
-            .forEach(entity -> {
-                final AffectedComponentEntity affectedComponentEntity = 
affectedComponents.get(entity.getId());
-                affectedComponentEntity.setRevision(entity.getRevision());
-
-                // only consider update this component if the user had 
permissions to it
-                if 
(Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
-                    final AffectedComponentDTO affectedComponent = 
affectedComponentEntity.getComponent();
-                    
affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus());
-                    
affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount());
-
-                    if 
(Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
-                        
affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
-                    }
-                }
-            });
+        updateAffectedProcessors(processorEntities, affectedComponents);
 
         for (final ProcessorEntity entity : processorEntities) {
             if (!affectedComponents.containsKey(entity.getId())) {
@@ -320,6 +390,16 @@ public class ClusterReplicationComponentLifecycle 
implements ComponentLifecycle
 
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
 
+        // If enabling services, validation must complete first
+        if (desiredState == ControllerServiceState.ENABLED) {
+            try {
+                waitForControllerServiceValidation(user, originalUri, groupId, 
affectedServiceIds, pause);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new LifecycleManagementException("Interrupted while 
waiting for Controller Services to complete validation");
+            }
+        }
+
         // Determine whether we should replicate only to the cluster 
coordinator, or if we should replicate directly to the cluster nodes themselves.
         try {
             final NodeResponse clusterResponse;
@@ -352,6 +432,67 @@ public class ClusterReplicationComponentLifecycle 
implements ComponentLifecycle
             .collect(Collectors.toSet());
     }
 
+    private boolean waitForControllerServiceValidation(final NiFiUser user, 
final URI originalUri, final String groupId,
+                                                       final Set<String> 
serviceIds, final Pause pause)
+            throws InterruptedException {
+
+        URI groupUri;
+        try {
+            groupUri = new URI(originalUri.getScheme(), 
originalUri.getUserInfo(), originalUri.getHost(),
+                    originalUri.getPort(), "/nifi-api/flow/process-groups/" + 
groupId + "/controller-services", 
"includeAncestorGroups=false,includeDescendantGroups=true", 
originalUri.getFragment());
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+
+        final Map<String, String> headers = new HashMap<>();
+        final MultivaluedMap<String, String> requestEntity = new 
MultivaluedHashMap<>();
+
+        boolean continuePolling = true;
+        while (continuePolling) {
+
+            // Determine whether we should replicate only to the cluster 
coordinator, or if we should replicate directly to the cluster nodes themselves.
+            final NodeResponse clusterResponse;
+            if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                clusterResponse = getRequestReplicator().replicate(user, 
HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
+            } else {
+                clusterResponse = getRequestReplicator().forwardToCoordinator(
+                        getClusterCoordinatorNode(), user, HttpMethod.GET, 
groupUri, requestEntity, headers).awaitMergedResponse();
+            }
+
+            if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
+                return false;
+            }
+
+            final ControllerServicesEntity controllerServicesEntity = 
getResponseEntity(clusterResponse, ControllerServicesEntity.class);
+            final Set<ControllerServiceEntity> serviceEntities = 
controllerServicesEntity.getControllerServices();
+
+            final Map<String, AffectedComponentEntity> affectedServices = 
serviceEntities.stream()
+                    .collect(Collectors.toMap(ControllerServiceEntity::getId, 
dtoFactory::createAffectedComponentEntity));
+
+            if (isControllerServiceValidationComplete(serviceEntities, 
affectedServices)) {
+                logger.debug("All {} controller services of interest have 
completed validation", affectedServices.size());
+                return true;
+            }
+            continuePolling = pause.pause();
+        }
+
+        return false;
+    }
+
+    private boolean isControllerServiceValidationComplete(final 
Set<ControllerServiceEntity> controllerServiceEntities, final Map<String, 
AffectedComponentEntity> affectedComponents) {
+        updateAffectedControllerServices(controllerServiceEntities, 
affectedComponents);
+        for (final ControllerServiceEntity entity : controllerServiceEntities) 
{
+            if (!affectedComponents.containsKey(entity.getId())) {
+                continue;
+            }
+
+            if 
(ControllerServiceDTO.VALIDATING.equals(entity.getComponent().getValidationStatus()))
 {
+                return false;
+            }
+        }
+        return true;
+    }
+
     /**
      * Periodically polls the process group with the given ID, waiting for all 
controller services whose ID's are given to have the given Controller Service 
State.
      *
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java
index b597797..1a5fc6b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java
@@ -105,6 +105,9 @@ public class LocalComponentLifecycle implements 
ComponentLifecycle {
 
         logger.debug("Starting components with ID's {} from Process Group {}", 
componentRevisions.keySet(), processGroupId);
 
+        // Wait for all affected processors to be either VALID or INVALID
+        waitForProcessorValidation(processGroupId, affectedComponents, pause);
+
         serviceFacade.verifyScheduleComponents(processGroupId, 
ScheduledState.RUNNING, componentRevisions.keySet());
         serviceFacade.scheduleComponents(processGroupId, 
ScheduledState.RUNNING, componentRevisions);
 
@@ -130,6 +133,42 @@ public class LocalComponentLifecycle implements 
ComponentLifecycle {
         waitForProcessorState(processGroupId, affectedComponents, 
ScheduledState.STOPPED, pause, invalidComponentAction);
     }
 
+
+    /**
+     * Waits for all given Processors to complete validation
+     *
+     * @return <code>true</code> if all processors have completed validation, 
<code>false</code> if the given {@link Pause}
+     *         indicated to give up before all of the processors have 
completed validation
+     */
+    private boolean waitForProcessorValidation(final String groupId, final 
Map<String, AffectedComponentEntity> affectedComponents, final Pause pause) {
+
+        logger.debug("Waiting for {} processors to complete validation", 
affectedComponents.size());
+        boolean continuePolling = true;
+        while (continuePolling) {
+            final Set<ProcessorEntity> processorEntities = 
serviceFacade.getProcessors(groupId, true);
+            if (isProcessorValidationComplete(processorEntities, 
affectedComponents)) {
+                logger.debug("All {} processors of interest have completed 
validation", affectedComponents.size());
+                return true;
+            }
+            continuePolling = pause.pause();
+        }
+        return false;
+    }
+
+    private boolean isProcessorValidationComplete(final Set<ProcessorEntity> 
processorEntities, final Map<String, AffectedComponentEntity> 
affectedComponents) {
+        updateAffectedProcessors(processorEntities, affectedComponents);
+        for (final ProcessorEntity entity : processorEntities) {
+            if (!affectedComponents.containsKey(entity.getId())) {
+                continue;
+            }
+
+            if 
(ProcessorDTO.VALIDATING.equals(entity.getComponent().getValidationStatus())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     /**
      * Waits for all of the given Processors to reach the given Scheduled 
State.
      *
@@ -157,29 +196,33 @@ public class LocalComponentLifecycle implements 
ComponentLifecycle {
         return false;
     }
 
+    private void updateAffectedProcessors(final Set<ProcessorEntity> 
processorEntities, final Map<String, AffectedComponentEntity> 
affectedComponents) {
+        // update the affected processors
+        processorEntities.stream()
+                .filter(entity -> 
affectedComponents.containsKey(entity.getId()))
+                .forEach(entity -> {
+                    final AffectedComponentEntity affectedComponentEntity = 
affectedComponents.get(entity.getId());
+                    affectedComponentEntity.setRevision(entity.getRevision());
+
+                    // only consider updating this component if the user has 
permissions to it
+                    if 
(Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
+                        final AffectedComponentDTO affectedComponent = 
affectedComponentEntity.getComponent();
+                        
affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus());
+                        
affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount());
+
+                        if 
(Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
+                            
affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
+                        }
+                    }
+                });
+    }
+
     private boolean isProcessorActionComplete(final Set<ProcessorEntity> 
processorEntities, final Map<String, AffectedComponentEntity> 
affectedComponents, final ScheduledState desiredState,
                                               final InvalidComponentAction 
invalidComponentAction) throws LifecycleManagementException {
 
         final String desiredStateName = desiredState.name();
 
-        // update the affected processors
-        processorEntities.stream()
-            .filter(entity -> affectedComponents.containsKey(entity.getId()))
-            .forEach(entity -> {
-                final AffectedComponentEntity affectedComponentEntity = 
affectedComponents.get(entity.getId());
-                affectedComponentEntity.setRevision(entity.getRevision());
-
-                // only consider updating this component if the user has 
permissions to it
-                if 
(Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
-                    final AffectedComponentDTO affectedComponent = 
affectedComponentEntity.getComponent();
-                    
affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus());
-                    
affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount());
-
-                    if 
(Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
-                        
affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
-                    }
-                }
-            });
+        updateAffectedProcessors(processorEntities, affectedComponents);
 
         for (final ProcessorEntity entity : processorEntities) {
             if (!affectedComponents.containsKey(entity.getId())) {
@@ -224,6 +267,8 @@ public class LocalComponentLifecycle implements 
ComponentLifecycle {
 
         logger.debug("Enabling Controller Services with ID's {} from Process 
Group {}", serviceRevisions.keySet(), processGroupId);
 
+        waitForControllerServiceValidation(processGroupId, affectedServices, 
pause);
+
         serviceFacade.verifyActivateControllerServices(processGroupId, 
ControllerServiceState.ENABLED, affectedServices.keySet());
         serviceFacade.activateControllerServices(processGroupId, 
ControllerServiceState.ENABLED, serviceRevisions);
         waitForControllerServiceState(processGroupId, affectedServices, 
ControllerServiceState.ENABLED, pause, invalidComponentAction);
@@ -282,6 +327,39 @@ public class LocalComponentLifecycle implements 
ComponentLifecycle {
         }
     }
 
+    /**
+     * Waits for all given Controller Services to complete validation
+     *
+     * @return <code>true</code> if all processors have completed validation, 
<code>false</code> if the given {@link Pause}
+     *         indicated to give up before all of the controller services have 
completed validation
+     */
+    private boolean waitForControllerServiceValidation(final String groupId, 
final Map<String, AffectedComponentEntity> affectedComponents, final Pause 
pause) {
+        logger.debug("Waiting for {} controller services to complete 
validation", affectedComponents.size());
+        boolean continuePolling = true;
+        while (continuePolling) {
+            final Set<ControllerServiceEntity> serviceEntities = 
serviceFacade.getControllerServices(groupId, false, true);
+            if (isControllerServiceValidationComplete(serviceEntities, 
affectedComponents)) {
+                logger.debug("All {} controller services of interest have 
completed validation", affectedComponents.size());
+                return true;
+            }
+            continuePolling = pause.pause();
+        }
+        return false;
+    }
+
+    private boolean isControllerServiceValidationComplete(final 
Set<ControllerServiceEntity> controllerServiceEntities, final Map<String, 
AffectedComponentEntity> affectedComponents) {
+        updateAffectedControllerServices(controllerServiceEntities, 
affectedComponents);
+        for (final ControllerServiceEntity entity : controllerServiceEntities) 
{
+            if (!affectedComponents.containsKey(entity.getId())) {
+                continue;
+            }
+
+            if 
(ControllerServiceDTO.VALIDATING.equals(entity.getComponent().getValidationStatus()))
 {
+                return false;
+            }
+        }
+        return true;
+    }
 
     /**
      * Periodically polls the process group with the given ID, waiting for all 
controller services whose ID's are given to have the given Controller Service 
State.

Reply via email to