This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c3eff68f92 NIFI-12924 Flow Analysis no longer done regularly or
on-demand, but automatically when a change occurs that could result in the
change of violations.
c3eff68f92 is described below
commit c3eff68f92d34be6404e54c77fc6bcba89c3ad12
Author: tpalfy <[email protected]>
AuthorDate: Wed Mar 20 19:21:10 2024 +0100
NIFI-12924 Flow Analysis no longer done regularly or on-demand, but
automatically when a change occurs that could result in the change of
violations.
Signed-off-by: Pierre Villard <[email protected]>
This closes #8537.
---
.../java/org/apache/nifi/util/NiFiProperties.java | 3 -
.../web/api/entity/FlowAnalysisResultEntity.java | 13 +
.../apache/nifi/groups/StandardProcessGroup.java | 4 +
.../nifi/controller/flowanalysis/FlowAnalyzer.java | 12 +
.../org/apache/nifi/controller/FlowController.java | 8 +-
.../nifi/flowanalysis/StandardFlowAnalyzer.java | 12 +
.../nifi/flowanalysis/TriggerFlowAnalysisTask.java | 14 +-
.../nifi-framework/nifi-resources/pom.xml | 3 -
.../src/main/resources/conf/nifi.properties | 3 -
.../org/apache/nifi/web/NiFiServiceFacade.java | 8 -
.../apache/nifi/web/StandardNiFiServiceFacade.java | 21 +-
.../apache/nifi/web/api/ProcessGroupResource.java | 261 ---------------------
.../web/dao/impl/StandardFlowAnalysisRuleDAO.java | 12 +
.../resources/conf/clustered/node1/nifi.properties | 2 -
.../resources/conf/clustered/node2/nifi.properties | 2 -
.../test/resources/conf/default/nifi.properties | 2 -
.../test/resources/conf/pythonic/nifi.properties | 2 -
17 files changed, 69 insertions(+), 313 deletions(-)
diff --git
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 01dd887190..2ddee1d959 100644
---
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -300,9 +300,6 @@ public class NiFiProperties extends ApplicationProperties {
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME =
"nifi.analytics.connection.model.score.name";
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD =
"nifi.analytics.connection.model.score.threshold";
- // flow analysis properties
- public static final String BACKGROUND_FLOW_ANALYSIS_SCHEDULE =
"nifi.flow.analysis.background.task.schedule";
-
// runtime monitoring properties
public static final String MONITOR_LONG_RUNNING_TASK_SCHEDULE =
"nifi.monitor.long.running.task.schedule";
public static final String MONITOR_LONG_RUNNING_TASK_THRESHOLD =
"nifi.monitor.long.running.task.threshold";
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowAnalysisResultEntity.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowAnalysisResultEntity.java
index c9e7c79e2b..ef7c313763 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowAnalysisResultEntity.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowAnalysisResultEntity.java
@@ -32,9 +32,22 @@ public class FlowAnalysisResultEntity extends Entity {
public FlowAnalysisResultEntity() {
}
+ private boolean flowAnalysisPending;
+
private List<FlowAnalysisRuleDTO> rules = new ArrayList<>();
private List<FlowAnalysisRuleViolationDTO> ruleViolations = new
ArrayList<>();
+ /**
+ * @return true if a flow analysis is going to be scheduled due to flow
changes, false otherwise
+ */
+ public boolean isFlowAnalysisPending() {
+ return flowAnalysisPending;
+ }
+
+ public void setFlowAnalysisPending(boolean flowAnalysisPending) {
+ this.flowAnalysisPending = flowAnalysisPending;
+ }
+
/**
* @return set of flow analysis rules that are being serialized
*/
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 8721273373..c7775c99a8 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -3479,6 +3479,10 @@ public final class StandardProcessGroup implements
ProcessGroup {
}
versionControlFields.setFlowDifferences(null);
+
+ flowManager.getFlowAnalyzer().ifPresent(
+ flowManager -> flowManager.setFlowAnalysisRequired(true)
+ );
}
@Override
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalyzer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalyzer.java
index 1c6be53371..743b8156a2 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalyzer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalyzer.java
@@ -24,6 +24,18 @@ import org.apache.nifi.flow.VersionedProcessGroup;
* Analyzes components, parts or the entirety of the flow.
*/
public interface FlowAnalyzer {
+
+ /**
+ * Returns whether flow analysis should be scheduled
+ * @return true if flow analysis should be scheduled, false otherwise
+ */
+ boolean isFlowAnalysisRequired();
+
+ /**
+ * Sets whether flow analysis should be scheduled
+ */
+ void setFlowAnalysisRequired(boolean flowAnalysisRequired);
+
/**
* Analyzes a processor
*
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 727f71f25f..d3eb8644cc 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1281,13 +1281,11 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
private void
scheduleBackgroundFlowAnalysis(Supplier<VersionedProcessGroup>
rootProcessGroupSupplier) {
if (flowAnalyzer != null) {
try {
- final long scheduleMillis =
parseDurationPropertyToMillis(NiFiProperties.BACKGROUND_FLOW_ANALYSIS_SCHEDULE);
-
flowAnalysisThreadPool.scheduleWithFixedDelay(
new TriggerFlowAnalysisTask(flowAnalyzer,
rootProcessGroupSupplier),
- scheduleMillis,
- scheduleMillis,
- TimeUnit.MILLISECONDS
+ 5,
+ 5,
+ TimeUnit.SECONDS
);
} catch (Exception e) {
LOG.warn("Could not initialize TriggerFlowAnalysisTask.", e);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/StandardFlowAnalyzer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/StandardFlowAnalyzer.java
index eba4c81b05..1b8ca8e566 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/StandardFlowAnalyzer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/StandardFlowAnalyzer.java
@@ -59,6 +59,8 @@ public class StandardFlowAnalyzer implements FlowAnalyzer {
private ControllerServiceProvider controllerServiceProvider;
+ private volatile boolean flowAnalysisRequired;
+
public StandardFlowAnalyzer(
final RuleViolationsManager ruleViolationsManager,
final FlowAnalysisRuleProvider flowAnalysisRuleProvider,
@@ -73,6 +75,16 @@ public class StandardFlowAnalyzer implements FlowAnalyzer {
this.controllerServiceProvider = controllerServiceProvider;
}
+ @Override
+ public boolean isFlowAnalysisRequired() {
+ return flowAnalysisRequired;
+ }
+
+ @Override
+ public void setFlowAnalysisRequired(boolean flowAnalysisRequired) {
+ this.flowAnalysisRequired = flowAnalysisRequired;
+ }
+
@Override
public void analyzeProcessor(ProcessorNode processorNode) {
logger.debug("Running analysis on {}", processorNode);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/TriggerFlowAnalysisTask.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/TriggerFlowAnalysisTask.java
index 56f86c3ade..97fd94ade5 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/TriggerFlowAnalysisTask.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/TriggerFlowAnalysisTask.java
@@ -36,12 +36,16 @@ public class TriggerFlowAnalysisTask implements Runnable {
@Override
public void run() {
- try {
+ if (flowAnalyzer.isFlowAnalysisRequired()) {
logger.debug("Triggering analysis of entire flow");
-
- flowAnalyzer.analyzeProcessGroup(rootProcessGroupSupplier.get());
- } catch (final Throwable t) {
- logger.error("Encountered unexpected error when attempting to
analyze flow", t);
+ try {
+
flowAnalyzer.analyzeProcessGroup(rootProcessGroupSupplier.get());
+ flowAnalyzer.setFlowAnalysisRequired(false);
+ } catch (final Throwable t) {
+ logger.error("Encountered unexpected error when attempting to
analyze flow", t);
+ }
+ } else {
+ logger.trace("Flow hasn't changed, flow analysis is put on hold");
}
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index 63e7308f24..49fc414acb 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -253,9 +253,6 @@
<nifi.analytics.connection.model.score.name>rSquared</nifi.analytics.connection.model.score.name>
<nifi.analytics.connection.model.score.threshold>.90</nifi.analytics.connection.model.score.threshold>
- <!-- nifi.properties: flow analysis properties -->
- <nifi.flow.analysis.background.task.schedule>5
mins</nifi.flow.analysis.background.task.schedule>
-
<!-- nifi.properties: runtime monitoring properties -->
<nifi.monitor.long.running.task.schedule>1
min</nifi.monitor.long.running.task.schedule>
<nifi.monitor.long.running.task.threshold>5
mins</nifi.monitor.long.running.task.threshold>
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 3147152535..f021c71a2f 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -337,9 +337,6 @@
nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.mode
# kubernetes #
nifi.cluster.leader.election.kubernetes.lease.prefix=${nifi.cluster.leader.election.kubernetes.lease.prefix}
-# flow analysis properties
-nifi.flow.analysis.background.task.schedule=${nifi.flow.analysis.background.task.schedule}
-
# runtime monitoring properties
nifi.monitor.long.running.task.schedule=
nifi.monitor.long.running.task.threshold=
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 71b1e92322..282f2a8486 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -2789,14 +2789,6 @@ public interface NiFiServiceFacade {
*/
FlowAnalysisRuleEntity deleteFlowAnalysisRule(Revision revision, String
flowAnalysisRuleId);
- /**
- * Analyze the flow or a part of it
- *
- * @param processGroupId The id of the process group representing (a part
of) the flow to be analyzed.
- * Recursive - all child process groups will be
analyzed as well.
- */
- void analyzeProcessGroup(String processGroupId);
-
/**
* @return all current rule violations
*/
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 96ec627f61..d2c0f1cd26 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -84,7 +84,6 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.controller.flow.FlowManager;
-import org.apache.nifi.controller.flowanalysis.FlowAnalysisUtil;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.repository.FlowFileEvent;
@@ -6411,22 +6410,6 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
return flowAnalysisRules;
}
- @Override
- public void analyzeProcessGroup(String processGroupId) {
- ProcessGroup processGroup =
processGroupDAO.getProcessGroup(processGroupId);
-
- NiFiRegistryFlowMapper mapper =
FlowAnalysisUtil.createMapper(controllerFacade.getExtensionManager());
-
- InstantiatedVersionedProcessGroup nonVersionedProcessGroup =
mapper.mapNonVersionedProcessGroup(
- processGroup,
- controllerFacade.getControllerServiceProvider()
- );
-
- controllerFacade.getFlowManager().getFlowAnalyzer().ifPresent(
- flowAnalyzer ->
flowAnalyzer.analyzeProcessGroup(nonVersionedProcessGroup)
- );
- }
-
@Override
public FlowAnalysisResultEntity getFlowAnalysisResult() {
Collection<RuleViolation> ruleViolations =
ruleViolationsManager.getAllRuleViolations();
@@ -6463,6 +6446,10 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
public FlowAnalysisResultEntity
createFlowAnalysisResultEntity(Collection<RuleViolation> ruleViolations) {
FlowAnalysisResultEntity entity = new FlowAnalysisResultEntity();
+ controllerFacade.getFlowManager().getFlowAnalyzer().ifPresent(
+ flowAnalyzer ->
entity.setFlowAnalysisPending(flowAnalyzer.isFlowAnalysisRequired())
+ );
+
List<FlowAnalysisRuleDTO> flowAnalysisRuleDtos =
flowAnalysisRuleDAO.getFlowAnalysisRules().stream()
.filter(FlowAnalysisRuleNode::isEnabled)
.sorted(Comparator.comparing(FlowAnalysisRuleNode::getName))
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 765a536f39..a8e993645d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -24,8 +24,6 @@ import
com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntr
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -43,7 +41,6 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag;
-import jakarta.servlet.http.HttpServletRequest;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.DefaultValue;
@@ -55,7 +52,6 @@ import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
-import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
@@ -93,13 +89,7 @@ import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.concurrent.AsyncRequestManager;
-import org.apache.nifi.web.api.concurrent.AsynchronousWebRequest;
import org.apache.nifi.web.api.concurrent.RequestManager;
-import org.apache.nifi.web.api.concurrent.StandardAsynchronousWebRequest;
-import org.apache.nifi.web.api.concurrent.StandardUpdateStep;
-import org.apache.nifi.web.api.concurrent.UpdateStep;
-import org.apache.nifi.web.api.dto.AnalyzeFlowRequestDTO;
-import org.apache.nifi.web.api.dto.AnalyzeFlowRequestUpdateStepDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.DropRequestDTO;
@@ -114,7 +104,6 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
-import org.apache.nifi.web.api.entity.AnalyzeFlowRequestEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ConnectionsEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
@@ -3000,256 +2989,6 @@ public class ProcessGroupResource extends
FlowUpdateResource<ProcessGroupImportE
return deleteFlowUpdateRequest("replace-requests", replaceRequestId,
disconnectedNodeAcknowledged.booleanValue());
}
- // -------------
- // flow-analysis
- // -------------
-
- /**
- * Submits a request to run a flow analysis.
- *
- * @param processGroupId The id of the process group representing (a part
of) the flow to be analyzed
- * @return An AnalyzeFlowRequestEntity
- */
- @POST
- @Consumes(MediaType.WILDCARD)
- @Produces(MediaType.APPLICATION_JSON)
- @Path("{id}/flow-analysis-requests")
- @Operation(
- summary = "Executes a flow analysis for components within a given
process group",
- responses = @ApiResponse(content = @Content(schema =
@Schema(implementation = AnalyzeFlowRequestEntity.class))),
- security = {
- @SecurityRequirement(name = "Read - /process-groups/{uuid}
- For this and all encapsulated process groups")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
- @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
- @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
- @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
- @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
- }
- )
- public Response submitAnalyzeFlowRequest(
- @Parameter(
- description = "The id of the process group representing (a
part of) the flow to be analyzed.",
- required = true
- )
- @PathParam("id") final String processGroupId
- ) {
- if (isReplicateRequest()) {
- return replicate(HttpMethod.POST);
- }
-
- NiFiUser user = NiFiUserUtils.getNiFiUser();
-
- ProcessGroupEntity requestProcessGroupEntity = new
ProcessGroupEntity();
- requestProcessGroupEntity.setId(processGroupId);
-
- return withWriteLock(
- serviceFacade,
- requestProcessGroupEntity,
- lookup -> {
- final ProcessGroupAuthorizable processGroup =
lookup.getProcessGroup(processGroupId);
- processGroup.getAuthorizable().authorize(authorizer,
RequestAction.READ, user);
- },
- null,
- (processGroupEntity) -> {
- String analyzedGroupId = processGroupEntity.getId();
-
- final String requestId = generateUuid();
- final AsynchronousWebRequest<String, Void>
analyzeFlowAsyncWebRequest = new StandardAsynchronousWebRequest<>(
- requestId,
- analyzedGroupId,
- analyzedGroupId,
- user,
- Collections.singletonList(new
StandardUpdateStep("Analyze Process Group"))
- );
-
- // Submit the request to be performed in the background
- final Consumer<AsynchronousWebRequest<String, Void>>
analyzeFlowTask = asyncRequest -> {
- try {
- serviceFacade.analyzeProcessGroup(analyzedGroupId);
- asyncRequest.markStepComplete();
- } catch (final Exception e) {
- logger.error("Failed to run flow analysis on
process group {}", processGroupId, e);
- asyncRequest.fail("Failed to run flow analysis on
process group " + processGroupId + " due to " + e);
- }
- };
- flowAnalysisAsyncRequestManager.submitRequest(
- FLOW_ANALYSIS_REQUEST_TYPE,
- requestId,
- analyzeFlowAsyncWebRequest,
- analyzeFlowTask
- );
-
- return
generateOkResponse(createAnalyzeFlowRequestEntity(analyzeFlowAsyncWebRequest,
requestId)).build();
- }
- );
- }
-
- /**
- * Checks the status of an outstanding request for a flow analysis.
- *
- * @param requestId The id of flow analysis request
- * @return An analyzeFlowRequestEntity
- */
- @GET
- @Consumes(MediaType.WILDCARD)
- @Produces(MediaType.APPLICATION_JSON)
- @Path("{id}/flow-analysis-requests/{requestId}")
- @Operation(
- summary = "Gets the current status of a flow analysis request.",
- responses = @ApiResponse(content = @Content(schema =
@Schema(implementation = AnalyzeFlowRequestEntity.class))),
- security = {
- @SecurityRequirement(name = "Read - /process-groups/{uuid}
- For this and all encapsulated process groups")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
- @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
- @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
- @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
- @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
- }
- )
- public Response getAnalyzeFlowRequest(
- @Parameter(
- description = "The id of the process group representing (a
part of) the flow being analyzed.",
- required = true
- )
- @PathParam("id") final String processGroupId,
- @Parameter(
- description = "The id of the process group representing (a
part of) the flow to be analyzed.",
- required = true
- )
- @PathParam("requestId") final String requestId
- ) {
- if (isReplicateRequest()) {
- return replicate(HttpMethod.GET);
- }
-
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
-
- // request manager will ensure that the current is the user that
submitted this request
- final AsynchronousWebRequest<String, Void> asyncRequest =
-
flowAnalysisAsyncRequestManager.getRequest(FLOW_ANALYSIS_REQUEST_TYPE,
requestId, user);
-
- return generateOkResponse(createAnalyzeFlowRequestEntity(asyncRequest,
requestId)).build();
- }
-
- /**
- * Cancels the specified flow analysis request.
- *
- * @param httpServletRequest request
- * @param requestId The id of the flow analysis request
- * @return An analyzeFlowRequestEntity
- */
- @DELETE
- @Consumes(MediaType.WILDCARD)
- @Produces(MediaType.APPLICATION_JSON)
- @Path("{id}/flow-analysis-requests/{requestId}")
- @Operation(
- summary = "Cancels a flow analysis request.",
- responses = @ApiResponse(content = @Content(schema =
@Schema(implementation = AnalyzeFlowRequestEntity.class))),
- security = {
- @SecurityRequirement(name = "Read - /process-groups/{uuid}
- For this and all encapsulated process groups")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(responseCode = "400", description = "NiFi was
unable to complete the request because it was invalid. The request should not
be retried without modification."),
- @ApiResponse(responseCode = "401", description = "Client
could not be authenticated."),
- @ApiResponse(responseCode = "403", description = "Client
is not authorized to make this request."),
- @ApiResponse(responseCode = "404", description = "The
specified resource could not be found."),
- @ApiResponse(responseCode = "409", description = "The
request was valid but NiFi was not in the appropriate state to process it.")
- }
- )
- public Response removeAnalyzeFlowRequest(
- @Parameter(
- description = "The id of the process group representing (a
part of) the flow being analyzed.",
- required = true
- )
- @PathParam("id") final String processGroupId,
- @Context final HttpServletRequest httpServletRequest,
- @Parameter(
- description = "The id of the flow analysis request",
- required = true
- )
- @PathParam("requestId") final String requestId
- ) {
- if (isReplicateRequest()) {
- return replicate(HttpMethod.DELETE);
- }
-
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- final boolean twoPhaseRequest = isTwoPhaseRequest(httpServletRequest);
- final boolean executionPhase = isExecutionPhase(httpServletRequest);
-
- // If this is a standalone node, or if this is the execution phase of
the request, perform the actual request.
- if (!twoPhaseRequest || executionPhase) {
- // request manager will ensure that the current is the user that
submitted this request
- final AsynchronousWebRequest<String, Void> asyncRequest =
-
flowAnalysisAsyncRequestManager.removeRequest(FLOW_ANALYSIS_REQUEST_TYPE,
requestId, user);
-
- if (asyncRequest == null) {
- throw new ResourceNotFoundException("Could not find request of
type " + FLOW_ANALYSIS_REQUEST_TYPE + " with ID " + requestId);
- }
-
- if (!asyncRequest.isComplete()) {
- asyncRequest.cancel();
- }
-
- AnalyzeFlowRequestEntity analyzeFlowRequestEntity =
createAnalyzeFlowRequestEntity(asyncRequest, requestId);
- return generateOkResponse(analyzeFlowRequestEntity).build();
- }
-
- if (isValidationPhase(httpServletRequest)) {
- // Perform authorization by attempting to get the request
-
flowAnalysisAsyncRequestManager.getRequest(FLOW_ANALYSIS_REQUEST_TYPE,
requestId, user);
- return generateContinueResponse().build();
- } else if (isCancellationPhase(httpServletRequest)) {
- return generateOkResponse().build();
- } else {
- throw new IllegalStateException("This request does not appear to
be part of the two phase commit.");
- }
- }
-
- private AnalyzeFlowRequestEntity createAnalyzeFlowRequestEntity(
- final AsynchronousWebRequest<String, Void> asyncRequest,
- final String requestId
- ) {
- String analyzedGroupId = asyncRequest.getRequest();
-
- AnalyzeFlowRequestDTO responseDto = new AnalyzeFlowRequestDTO();
- responseDto.setProcessGroupId(analyzedGroupId);
-
- responseDto.setRequestId(requestId);
- responseDto.setComplete(asyncRequest.isComplete());
- responseDto.setFailureReason(asyncRequest.getFailureReason());
- responseDto.setLastUpdated(asyncRequest.getLastUpdated());
- responseDto.setPercentCompleted(asyncRequest.getPercentComplete());
- responseDto.setState(asyncRequest.getState());
- responseDto.setUri(generateResourceUri("process-groups",
"flow-analysis", analyzedGroupId));
-
- final List<AnalyzeFlowRequestUpdateStepDTO> updateSteps = new
ArrayList<>();
- for (final UpdateStep updateStep : asyncRequest.getUpdateSteps()) {
- final AnalyzeFlowRequestUpdateStepDTO updateStepDTO = new
AnalyzeFlowRequestUpdateStepDTO();
- updateStepDTO.setDescription(updateStep.getDescription());
- updateStepDTO.setComplete(updateStep.isComplete());
- updateStepDTO.setFailureReason(updateStep.getFailureReason());
- updateSteps.add(updateStepDTO);
- }
- responseDto.setUpdateSteps(updateSteps);
-
- AnalyzeFlowRequestEntity analyzeFlowRequestEntity = new
AnalyzeFlowRequestEntity();
- analyzeFlowRequestEntity.setAnalyzeFlowRequest(responseDto);
-
- return analyzeFlowRequestEntity;
- }
- //--
-
/**
* Perform actual flow update of the specified flow. This is used for the
initial flow update and replication updates.
*/
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java
index 0d3365f1b2..50ad24d165 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java
@@ -100,6 +100,10 @@ public class StandardFlowAnalysisRuleDAO extends
ComponentDAO implements FlowAna
// perform the update
configureFlowAnalysisRule(flowAnalysisRule, flowAnalysisRuleDTO);
+ flowController.getFlowManager().getFlowAnalyzer().ifPresent(
+ flowAnalyzer -> flowAnalyzer.setFlowAnalysisRequired(true)
+ );
+
return flowAnalysisRule;
} catch (FlowAnalysisRuleInstantiationException rtie) {
throw new NiFiCoreException(rtie.getMessage(), rtie);
@@ -163,6 +167,10 @@ public class StandardFlowAnalysisRuleDAO extends
ComponentDAO implements FlowAna
}
}
+ flowController.getFlowManager().getFlowAnalyzer().ifPresent(
+ flowAnalyzer -> flowAnalyzer.setFlowAnalysisRequired(true)
+ );
+
return flowAnalysisRule;
}
@@ -326,6 +334,10 @@ public class StandardFlowAnalysisRuleDAO extends
ComponentDAO implements FlowAna
public void deleteFlowAnalysisRule(String flowAnalysisRuleId) {
final FlowAnalysisRuleNode flowAnalysisRule =
locateFlowAnalysisRule(flowAnalysisRuleId);
flowAnalysisRuleProvider.removeFlowAnalysisRule(flowAnalysisRule);
+
+ flowController.getFlowManager().getFlowAnalyzer().ifPresent(
+ flowAnalyzer -> flowAnalyzer.setFlowAnalysisRequired(true)
+ );
}
@Override
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
index f9ef29179d..d914e8e643 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
@@ -253,5 +253,3 @@ nifi.kerberos.service.keytab.location=
nifi.kerberos.spnego.principal=
nifi.kerberos.spnego.keytab.location=
nifi.kerberos.spnego.authentication.expiration=12 hours
-
-nifi.flow.analysis.background.task.schedule=5 mins
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
index 8b45f32b17..5bb68875e7 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
@@ -253,5 +253,3 @@ nifi.kerberos.service.keytab.location=
nifi.kerberos.spnego.principal=
nifi.kerberos.spnego.keytab.location=
nifi.kerberos.spnego.authentication.expiration=12 hours
-
-nifi.flow.analysis.background.task.schedule=5 mins
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
index 4758d714dc..b2f1a05240 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
@@ -254,5 +254,3 @@ nifi.kerberos.service.keytab.location=
nifi.kerberos.spnego.principal=
nifi.kerberos.spnego.keytab.location=
nifi.kerberos.spnego.authentication.expiration=12 hours
-
-nifi.flow.analysis.background.task.schedule=5 mins
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/nifi.properties
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/nifi.properties
index faf92c6a96..d0efbde51d 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/nifi.properties
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/nifi.properties
@@ -262,5 +262,3 @@ nifi.kerberos.spnego.authentication.expiration=12 hours
# external properties files for variable registry
# supports a comma delimited list of file locations
nifi.variable.registry.properties=
-
-nifi.flow.analysis.background.task.schedule=5 mins