This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new df90c65 NIFI-6735 - validate components before restarting processors
following parameter context update.
df90c65 is described below
commit df90c6524695ea10d76d58873dfdf4af4b206a96
Author: Rob Fellows <[email protected]>
AuthorDate: Tue Oct 1 15:38:53 2019 -0400
NIFI-6735 - validate components before restarting processors following
parameter context update.
This closes #3782.
Signed-off-by: Mark Payne <[email protected]>
---
.../nifi/web/api/ParameterContextResource.java | 7 +-
.../util/ClusterReplicationComponentLifecycle.java | 179 ++++++++++++++++++---
.../nifi/web/util/LocalComponentLifecycle.java | 114 ++++++++++---
3 files changed, 259 insertions(+), 41 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
index 5c8fd6a..20fcad4 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
@@ -865,6 +865,7 @@ public class ParameterContextResource extends
ApplicationResource {
final ParameterContextEntity updatedEntity;
try {
updatedEntity = performParameterContextUpdate(asyncRequest, uri,
replicateRequest, revision, updatedContextEntity);
+ asyncRequest.markStepComplete();
logger.info("Successfully updated Parameter Context with ID {}",
updatedContextEntity.getId());
} finally {
// TODO: can almost certainly be refactored so that the same code
is shared between VersionsResource and ParameterContextResource.
@@ -968,8 +969,6 @@ public class ParameterContextResource extends
ApplicationResource {
logger.info("Restarting {} Processors after having updated
Parameter Context", processors.size());
}
- asyncRequest.markStepComplete();
-
// Step 14. Restart all components
final Set<AffectedComponentEntity> componentsToStart =
getUpdatedEntities(processors);
@@ -978,6 +977,7 @@ public class ParameterContextResource extends
ApplicationResource {
try {
componentLifecycle.scheduleComponents(uri, "root",
componentsToStart, ScheduledState.RUNNING, startComponentsPause,
InvalidComponentAction.SKIP);
+ asyncRequest.markStepComplete();
} catch (final IllegalStateException ise) {
// Component Lifecycle will restart the Processors only if they
are valid. If IllegalStateException gets thrown, we need to provide
// a more intelligent error message as to exactly what happened,
rather than indicate that the flow could not be updated.
@@ -1003,8 +1003,6 @@ public class ParameterContextResource extends
ApplicationResource {
logger.info("Re-Enabling {} Controller Services after having
updated Parameter Context", controllerServices.size());
}
- asyncRequest.markStepComplete();
-
// Step 13. Re-enable all disabled controller services
final CancellableTimedPause enableServicesPause = new
CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(enableServicesPause::cancel);
@@ -1012,6 +1010,7 @@ public class ParameterContextResource extends
ApplicationResource {
try {
componentLifecycle.activateControllerServices(uri, "root",
servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause,
InvalidComponentAction.SKIP);
+ asyncRequest.markStepComplete();
} catch (final IllegalStateException ise) {
// Component Lifecycle will re-enable the Controller Services only
if they are valid. If IllegalStateException gets thrown, we need to provide
// a more intelligent error message as to exactly what happened,
rather than indicate that the Parameter Context could not be updated.
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java
index 934e927..2813daf 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java
@@ -101,6 +101,16 @@ public class ClusterReplicationComponentLifecycle
implements ComponentLifecycle
final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ // If attempting to run the processors, validation must complete first
+ if (desiredState == ScheduledState.RUNNING) {
+ try {
+ waitForProcessorValidation(user, exampleUri, groupId,
componentMap, pause);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new LifecycleManagementException("Interrupted while
waiting for processors to complete validation");
+ }
+ }
+
// Determine whether we should replicate only to the cluster
coordinator, or if we should replicate directly to the cluster nodes themselves.
try {
final NodeResponse clusterResponse;
@@ -156,6 +166,64 @@ public class ClusterReplicationComponentLifecycle
implements ComponentLifecycle
return
processorRevisions.stream().collect(Collectors.toMap(Revision::getComponentId,
Function.identity()));
}
+ private boolean waitForProcessorValidation(final NiFiUser user, final URI
originalUri, final String groupId,
+ final Map<String,
AffectedComponentEntity> processors, final Pause pause) throws
InterruptedException {
+ URI groupUri;
+ try {
+ groupUri = new URI(originalUri.getScheme(),
originalUri.getUserInfo(), originalUri.getHost(),
+ originalUri.getPort(), "/nifi-api/process-groups/" +
groupId + "/processors", "includeDescendantGroups=true",
originalUri.getFragment());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
+ final Map<String, String> headers = new HashMap<>();
+ final MultivaluedMap<String, String> requestEntity = new
MultivaluedHashMap<>();
+
+ boolean continuePolling = true;
+ while (continuePolling) {
+
+ // Determine whether we should replicate only to the cluster
coordinator, or if we should replicate directly to the cluster nodes themselves.
+ final NodeResponse clusterResponse;
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ clusterResponse = getRequestReplicator().replicate(user,
HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
+ } else {
+ clusterResponse = getRequestReplicator().forwardToCoordinator(
+ getClusterCoordinatorNode(), user, HttpMethod.GET,
groupUri, requestEntity, headers).awaitMergedResponse();
+ }
+
+ if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
+ return false;
+ }
+
+ final ProcessorsEntity processorsEntity =
getResponseEntity(clusterResponse, ProcessorsEntity.class);
+ final Set<ProcessorEntity> processorEntities =
processorsEntity.getProcessors();
+
+ if (isProcessorValidationComplete(processorEntities, processors)) {
+ logger.debug("All {} processors of interest now have been
validated", processors.size());
+ return true;
+ }
+
+ // Not all of the processors are done validating. Pause for a bit
and poll again.
+ continuePolling = pause.pause();
+ }
+
+ return false;
+ }
+
+ private boolean isProcessorValidationComplete(Set<ProcessorEntity>
processorEntities, Map<String, AffectedComponentEntity> affectedComponents) {
+ updateAffectedProcessors(processorEntities, affectedComponents);
+ for (final ProcessorEntity entity : processorEntities) {
+ if (!affectedComponents.containsKey(entity.getId())) {
+ continue;
+ }
+
+ if
(ProcessorDTO.VALIDATING.equals(entity.getComponent().getValidationStatus())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* Periodically polls the process group with the given ID, waiting for all
processors whose ID's are given to have the given Scheduled State.
*
@@ -230,30 +298,32 @@ public class ClusterReplicationComponentLifecycle
implements ComponentLifecycle
return entity;
}
+ private void updateAffectedProcessors(final Set<ProcessorEntity>
processorEntities, final Map<String, AffectedComponentEntity>
affectedComponents) {
+ // update the affected processors
+ processorEntities.stream()
+ .filter(entity ->
affectedComponents.containsKey(entity.getId()))
+ .forEach(entity -> {
+ final AffectedComponentEntity affectedComponentEntity =
affectedComponents.get(entity.getId());
+ affectedComponentEntity.setRevision(entity.getRevision());
+
+ // only consider update this component if the user had
permissions to it
+ if
(Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
+ final AffectedComponentDTO affectedComponent =
affectedComponentEntity.getComponent();
+
affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus());
+
affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount());
+
+ if
(Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
+
affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
+ }
+ }
+ });
+ }
private boolean isProcessorActionComplete(final Set<ProcessorEntity>
processorEntities, final Map<String, AffectedComponentEntity>
affectedComponents, final ScheduledState desiredState,
final InvalidComponentAction
invalidComponentAction) throws LifecycleManagementException {
-
final String desiredStateName = desiredState.name();
- // update the affected processors
- processorEntities.stream()
- .filter(entity -> affectedComponents.containsKey(entity.getId()))
- .forEach(entity -> {
- final AffectedComponentEntity affectedComponentEntity =
affectedComponents.get(entity.getId());
- affectedComponentEntity.setRevision(entity.getRevision());
-
- // only consider update this component if the user had
permissions to it
- if
(Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
- final AffectedComponentDTO affectedComponent =
affectedComponentEntity.getComponent();
-
affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus());
-
affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount());
-
- if
(Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
-
affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
- }
- }
- });
+ updateAffectedProcessors(processorEntities, affectedComponents);
for (final ProcessorEntity entity : processorEntities) {
if (!affectedComponents.containsKey(entity.getId())) {
@@ -320,6 +390,16 @@ public class ClusterReplicationComponentLifecycle
implements ComponentLifecycle
final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ // If enabling services, validation must complete first
+ if (desiredState == ControllerServiceState.ENABLED) {
+ try {
+ waitForControllerServiceValidation(user, originalUri, groupId,
affectedServiceIds, pause);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new LifecycleManagementException("Interrupted while
waiting for Controller Services to complete validation");
+ }
+ }
+
// Determine whether we should replicate only to the cluster
coordinator, or if we should replicate directly to the cluster nodes themselves.
try {
final NodeResponse clusterResponse;
@@ -352,6 +432,67 @@ public class ClusterReplicationComponentLifecycle
implements ComponentLifecycle
.collect(Collectors.toSet());
}
+ private boolean waitForControllerServiceValidation(final NiFiUser user,
final URI originalUri, final String groupId,
+ final Set<String>
serviceIds, final Pause pause)
+ throws InterruptedException {
+
+ URI groupUri;
+ try {
+ groupUri = new URI(originalUri.getScheme(),
originalUri.getUserInfo(), originalUri.getHost(),
+ originalUri.getPort(), "/nifi-api/flow/process-groups/" +
groupId + "/controller-services",
"includeAncestorGroups=false,includeDescendantGroups=true",
originalUri.getFragment());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
+ final Map<String, String> headers = new HashMap<>();
+ final MultivaluedMap<String, String> requestEntity = new
MultivaluedHashMap<>();
+
+ boolean continuePolling = true;
+ while (continuePolling) {
+
+ // Determine whether we should replicate only to the cluster
coordinator, or if we should replicate directly to the cluster nodes themselves.
+ final NodeResponse clusterResponse;
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ clusterResponse = getRequestReplicator().replicate(user,
HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
+ } else {
+ clusterResponse = getRequestReplicator().forwardToCoordinator(
+ getClusterCoordinatorNode(), user, HttpMethod.GET,
groupUri, requestEntity, headers).awaitMergedResponse();
+ }
+
+ if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
+ return false;
+ }
+
+ final ControllerServicesEntity controllerServicesEntity =
getResponseEntity(clusterResponse, ControllerServicesEntity.class);
+ final Set<ControllerServiceEntity> serviceEntities =
controllerServicesEntity.getControllerServices();
+
+ final Map<String, AffectedComponentEntity> affectedServices =
serviceEntities.stream()
+ .collect(Collectors.toMap(ControllerServiceEntity::getId,
dtoFactory::createAffectedComponentEntity));
+
+ if (isControllerServiceValidationComplete(serviceEntities,
affectedServices)) {
+ logger.debug("All {} controller services of interest have
completed validation", affectedServices.size());
+ return true;
+ }
+ continuePolling = pause.pause();
+ }
+
+ return false;
+ }
+
+ private boolean isControllerServiceValidationComplete(final
Set<ControllerServiceEntity> controllerServiceEntities, final Map<String,
AffectedComponentEntity> affectedComponents) {
+ updateAffectedControllerServices(controllerServiceEntities,
affectedComponents);
+ for (final ControllerServiceEntity entity : controllerServiceEntities)
{
+ if (!affectedComponents.containsKey(entity.getId())) {
+ continue;
+ }
+
+ if
(ControllerServiceDTO.VALIDATING.equals(entity.getComponent().getValidationStatus()))
{
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* Periodically polls the process group with the given ID, waiting for all
controller services whose ID's are given to have the given Controller Service
State.
*
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java
index b597797..1a5fc6b 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java
@@ -105,6 +105,9 @@ public class LocalComponentLifecycle implements
ComponentLifecycle {
logger.debug("Starting components with ID's {} from Process Group {}",
componentRevisions.keySet(), processGroupId);
+ // Wait for all affected processors to be either VALID or INVALID
+ waitForProcessorValidation(processGroupId, affectedComponents, pause);
+
serviceFacade.verifyScheduleComponents(processGroupId,
ScheduledState.RUNNING, componentRevisions.keySet());
serviceFacade.scheduleComponents(processGroupId,
ScheduledState.RUNNING, componentRevisions);
@@ -130,6 +133,42 @@ public class LocalComponentLifecycle implements
ComponentLifecycle {
waitForProcessorState(processGroupId, affectedComponents,
ScheduledState.STOPPED, pause, invalidComponentAction);
}
+
+ /**
+ * Waits for all given Processors to complete validation
+ *
+ * @return <code>true</code> if all processors have completed validation,
<code>false</code> if the given {@link Pause}
+ * indicated to give up before all of the processors have
completed validation
+ */
+ private boolean waitForProcessorValidation(final String groupId, final
Map<String, AffectedComponentEntity> affectedComponents, final Pause pause) {
+
+ logger.debug("Waiting for {} processors to complete validation",
affectedComponents.size());
+ boolean continuePolling = true;
+ while (continuePolling) {
+ final Set<ProcessorEntity> processorEntities =
serviceFacade.getProcessors(groupId, true);
+ if (isProcessorValidationComplete(processorEntities,
affectedComponents)) {
+ logger.debug("All {} processors of interest have completed
validation", affectedComponents.size());
+ return true;
+ }
+ continuePolling = pause.pause();
+ }
+ return false;
+ }
+
+ private boolean isProcessorValidationComplete(final Set<ProcessorEntity>
processorEntities, final Map<String, AffectedComponentEntity>
affectedComponents) {
+ updateAffectedProcessors(processorEntities, affectedComponents);
+ for (final ProcessorEntity entity : processorEntities) {
+ if (!affectedComponents.containsKey(entity.getId())) {
+ continue;
+ }
+
+ if
(ProcessorDTO.VALIDATING.equals(entity.getComponent().getValidationStatus())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* Waits for all of the given Processors to reach the given Scheduled
State.
*
@@ -157,29 +196,33 @@ public class LocalComponentLifecycle implements
ComponentLifecycle {
return false;
}
+ private void updateAffectedProcessors(final Set<ProcessorEntity>
processorEntities, final Map<String, AffectedComponentEntity>
affectedComponents) {
+ // update the affected processors
+ processorEntities.stream()
+ .filter(entity ->
affectedComponents.containsKey(entity.getId()))
+ .forEach(entity -> {
+ final AffectedComponentEntity affectedComponentEntity =
affectedComponents.get(entity.getId());
+ affectedComponentEntity.setRevision(entity.getRevision());
+
+ // only consider updating this component if the user has
permissions to it
+ if
(Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
+ final AffectedComponentDTO affectedComponent =
affectedComponentEntity.getComponent();
+
affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus());
+
affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount());
+
+ if
(Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
+
affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
+ }
+ }
+ });
+ }
+
private boolean isProcessorActionComplete(final Set<ProcessorEntity>
processorEntities, final Map<String, AffectedComponentEntity>
affectedComponents, final ScheduledState desiredState,
final InvalidComponentAction
invalidComponentAction) throws LifecycleManagementException {
final String desiredStateName = desiredState.name();
- // update the affected processors
- processorEntities.stream()
- .filter(entity -> affectedComponents.containsKey(entity.getId()))
- .forEach(entity -> {
- final AffectedComponentEntity affectedComponentEntity =
affectedComponents.get(entity.getId());
- affectedComponentEntity.setRevision(entity.getRevision());
-
- // only consider updating this component if the user has
permissions to it
- if
(Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
- final AffectedComponentDTO affectedComponent =
affectedComponentEntity.getComponent();
-
affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus());
-
affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount());
-
- if
(Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
-
affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
- }
- }
- });
+ updateAffectedProcessors(processorEntities, affectedComponents);
for (final ProcessorEntity entity : processorEntities) {
if (!affectedComponents.containsKey(entity.getId())) {
@@ -224,6 +267,8 @@ public class LocalComponentLifecycle implements
ComponentLifecycle {
logger.debug("Enabling Controller Services with ID's {} from Process
Group {}", serviceRevisions.keySet(), processGroupId);
+ waitForControllerServiceValidation(processGroupId, affectedServices,
pause);
+
serviceFacade.verifyActivateControllerServices(processGroupId,
ControllerServiceState.ENABLED, affectedServices.keySet());
serviceFacade.activateControllerServices(processGroupId,
ControllerServiceState.ENABLED, serviceRevisions);
waitForControllerServiceState(processGroupId, affectedServices,
ControllerServiceState.ENABLED, pause, invalidComponentAction);
@@ -282,6 +327,39 @@ public class LocalComponentLifecycle implements
ComponentLifecycle {
}
}
+ /**
+ * Waits for all given Controller Services to complete validation
+ *
+ * @return <code>true</code> if all processors have completed validation,
<code>false</code> if the given {@link Pause}
+ * indicated to give up before all of the controller services have
completed validation
+ */
+ private boolean waitForControllerServiceValidation(final String groupId,
final Map<String, AffectedComponentEntity> affectedComponents, final Pause
pause) {
+ logger.debug("Waiting for {} controller services to complete
validation", affectedComponents.size());
+ boolean continuePolling = true;
+ while (continuePolling) {
+ final Set<ControllerServiceEntity> serviceEntities =
serviceFacade.getControllerServices(groupId, false, true);
+ if (isControllerServiceValidationComplete(serviceEntities,
affectedComponents)) {
+ logger.debug("All {} controller services of interest have
completed validation", affectedComponents.size());
+ return true;
+ }
+ continuePolling = pause.pause();
+ }
+ return false;
+ }
+
+ private boolean isControllerServiceValidationComplete(final
Set<ControllerServiceEntity> controllerServiceEntities, final Map<String,
AffectedComponentEntity> affectedComponents) {
+ updateAffectedControllerServices(controllerServiceEntities,
affectedComponents);
+ for (final ControllerServiceEntity entity : controllerServiceEntities)
{
+ if (!affectedComponents.containsKey(entity.getId())) {
+ continue;
+ }
+
+ if
(ControllerServiceDTO.VALIDATING.equals(entity.getComponent().getValidationStatus()))
{
+ return false;
+ }
+ }
+ return true;
+ }
/**
* Periodically polls the process group with the given ID, waiting for all
controller services whose ID's are given to have the given Controller Service
State.