This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 47f45ab NIFI-6748: Fixed bug in Parameter Contexts' affected
components where if a Controller Service referenced a Parameter, any component
that references that service should also be considered an affected component
but wasn't. Also fixed a bug in how we handled stopping a Processor that was in
the STARTING phase.
47f45ab is described below
commit 47f45abdf9d99ac4b46dbed56c6dac3e0404d8be
Author: Mark Payne <[email protected]>
AuthorDate: Tue Oct 8 14:19:40 2019 -0400
NIFI-6748: Fixed bug in Parameter Contexts' affected components where if a
Controller Service referenced a Parameter, any component that references that
service should also be considered an affected component but wasn't. Also fixed
a bug in how we handled stopping a Processor that was in the STARTING phase.
This closes #3794.
Signed-off-by: Bryan Bende <[email protected]>
---
.../web/api/entity/AffectedComponentEntity.java | 18 +++
.../nifi/controller/StandardProcessorNode.java | 11 +-
.../apache/nifi/groups/StandardProcessGroup.java | 4 +-
.../apache/nifi/web/StandardNiFiServiceFacade.java | 121 +++++++++++++--------
4 files changed, 105 insertions(+), 49 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java
index 7ef93a7..8ba95c3 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java
@@ -21,6 +21,7 @@ import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.ProcessGroupNameDTO;
import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Objects;
/**
* A serialized representation of this class can be placed in the entity body
of a response to the API.
@@ -71,4 +72,21 @@ public class AffectedComponentEntity extends ComponentEntity
implements Permissi
public String toString() {
return component == null ? "AffectedComponent[No Component]" :
component.toString();
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(getId());
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof AffectedComponentEntity)) {
+ return false;
+ }
+
+ return Objects.equals(getId(), ((AffectedComponentEntity)
obj).getId());
+ }
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 92805e7..96402a4 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1361,6 +1361,7 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
final Processor processor = processorRef.get().getProcessor();
final ComponentLog procLog = new
SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
+ LOG.info("Starting {}", this);
ScheduledState currentState;
boolean starting;
@@ -1498,7 +1499,7 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
final ProcessContext processContext = processContextFactory.get();
final ScheduledState currentScheduleState = scheduledState.get();
- if (currentScheduleState == ScheduledState.STOPPING ||
currentScheduleState == ScheduledState.STOPPED) {
+ if (currentScheduleState == ScheduledState.STOPPING ||
currentScheduleState == ScheduledState.STOPPED || getDesiredState() ==
ScheduledState.STOPPED) {
LOG.debug("{} is stopped. Will not call @OnScheduled lifecycle
methods or begin trigger onTrigger() method", StandardProcessorNode.this);
schedulingAgentCallback.onTaskComplete();
scheduledState.set(ScheduledState.STOPPED);
@@ -1648,7 +1649,7 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
final SchedulingAgent schedulingAgent, final LifecycleState
scheduleState) {
final Processor processor = processorRef.get().getProcessor();
- LOG.info("Stopping processor: " + processor.getClass());
+ LOG.info("Stopping processor: " + this);
desiredState = ScheduledState.STOPPED;
final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -1722,7 +1723,11 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
// before stop() was called. If that happens the stop processor
// routine will be initiated in start() method, otherwise the IF
// part will handle the stop processor routine.
- this.scheduledState.compareAndSet(ScheduledState.STARTING,
ScheduledState.STOPPING);
+ final boolean updated =
this.scheduledState.compareAndSet(ScheduledState.STARTING,
ScheduledState.STOPPING);
+ if (updated) {
+ LOG.debug("Transitioned state of {} from STARTING to
STOPPING", this);
+ }
+
future.complete(null);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index b52d68c..f04286a 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -1397,8 +1397,6 @@ public final class StandardProcessGroup implements
ProcessGroup {
final ScheduledState state = processor.getScheduledState();
if (state == ScheduledState.DISABLED) {
throw new IllegalStateException("Processor is disabled");
- } else if (state == ScheduledState.STOPPED) {
- return CompletableFuture.completedFuture(null);
}
return scheduler.stopProcessor(processor);
@@ -2954,7 +2952,7 @@ public final class StandardProcessGroup implements
ProcessGroup {
}
if (service.getState() != ControllerServiceState.DISABLED) {
- throw new IllegalStateException("Cannot change Parameter
Context for " + this + " because " + service + " is referencing at least one
Parameter is is not disabled");
+ throw new IllegalStateException("Cannot change Parameter
Context for " + this + " because " + service + " is referencing at least one
Parameter and is not disabled");
}
verifyParameterSensitivityIsValid(service, parameterContext);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 9c96cc5..6532f51 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,35 +16,7 @@
*/
package org.apache.nifi.web;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-
+import com.google.common.collect.Sets;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
@@ -338,7 +310,33 @@ import org.apache.nifi.web.util.SnippetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Implementation of NiFiServiceFacade that performs revision checking.
@@ -1282,22 +1280,21 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
for (final ControllerServiceNode service :
group.getControllerServices(false)) {
if (includeInactive || service.isActive()) {
final Set<String> referencedParams =
service.getReferencedParameterNames();
- final boolean referencesUpdatedParam =
referencedParams.stream().anyMatch(updatedParameterNames::contains);
-
- if (referencesUpdatedParam) {
- affectedComponents.add(service);
-
- final AffectedComponentEntity affectedComponentEntity
= dtoFactory.createAffectedComponentEntity(service, revisionManager);
-
- for (final String referencedParam : referencedParams) {
- for (final ParameterEntity paramEntity :
parameterContextDto.getParameters()) {
- final ParameterDTO paramDto =
paramEntity.getParameter();
- if
(referencedParam.equals(paramDto.getName())) {
-
paramDto.getReferencingComponents().add(affectedComponentEntity);
- }
+ final Set<String> updatedReferencedParams =
referencedParams.stream().filter(updatedParameterNames::contains).collect(Collectors.toSet());
+
+ final List<ParameterDTO> affectedParameterDtos = new
ArrayList<>();
+ for (final String referencedParam : referencedParams) {
+ for (final ParameterEntity paramEntity :
parameterContextDto.getParameters()) {
+ final ParameterDTO paramDto =
paramEntity.getParameter();
+ if (referencedParam.equals(paramDto.getName())) {
+ affectedParameterDtos.add(paramDto);
}
}
}
+
+ if (!updatedReferencedParams.isEmpty()) {
+ addReferencingComponents(service, affectedComponents,
affectedParameterDtos, includeInactive);
+ }
}
}
}
@@ -1305,6 +1302,44 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
return dtoFactory.createAffectedComponentEntities(affectedComponents,
revisionManager);
}
+ private void addReferencingComponents(final ControllerServiceNode service,
final Set<ComponentNode> affectedComponents, final List<ParameterDTO>
affectedParameterDtos,
+ final boolean includeInactive) {
+
+ // We keep a mapping of Affected Components for the Parameter Context
Update as well as a set of all Affected Components for each updated Parameter.
+ // We must update both of these.
+ affectedComponents.add(service);
+
+ // Update Parameter DTO to also reflect the Affected Component.
+ final AffectedComponentEntity affectedComponentEntity =
dtoFactory.createAffectedComponentEntity(service, revisionManager);
+ affectedParameterDtos.forEach(dto ->
dto.getReferencingComponents().add(affectedComponentEntity));
+
+ for (final ComponentNode referencingComponent :
service.getReferences().getReferencingComponents()) {
+ if (includeInactive || isActive(referencingComponent)) {
+ // We must update both the Set of Affected Components as well
as the Affected Components for the referenced parameter.
+ affectedComponents.add(referencingComponent);
+
+ final AffectedComponentEntity referencingComponentEntity =
dtoFactory.createAffectedComponentEntity(referencingComponent, revisionManager);
+ affectedParameterDtos.forEach(dto ->
dto.getReferencingComponents().add(referencingComponentEntity));
+
+ if (referencingComponent instanceof ControllerServiceNode) {
+ addReferencingComponents((ControllerServiceNode)
referencingComponent, affectedComponents, affectedParameterDtos,
includeInactive);
+ }
+ }
+ }
+ }
+
+ private boolean isActive(final ComponentNode componentNode) {
+ if (componentNode instanceof ControllerServiceNode) {
+ return ((ControllerServiceNode) componentNode).isActive();
+ }
+
+ if (componentNode instanceof ProcessorNode) {
+ return ((ProcessorNode) componentNode).isRunning();
+ }
+
+ return false;
+ }
+
private Set<String> getUpdatedParameterNames(final ParameterContextDTO
parameterContextDto) {
final ParameterContext parameterContext =
parameterContextDAO.getParameterContext(parameterContextDto.getId());