This is an automated email from the ASF dual-hosted git repository.

bejancsaba pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new dca7aac9b3 NIFI-13541 [MiNiFi] Validate flow during startup
dca7aac9b3 is described below

commit dca7aac9b3f3b9e79ed37572d17f69c89d295470
Author: Ferenc Kis <[email protected]>
AuthorDate: Thu Jul 11 12:35:07 2024 +0200

    NIFI-13541 [MiNiFi] Validate flow during startup
    
    Signed-off-by: Csaba Bejan <[email protected]>
    
    This closes #9073.
---
 .../nifi/minifi/commons/utils/RetryUtil.java       |  47 ++++++
 .../minifi-framework/minifi-framework-core/pom.xml |   5 +
 .../DefaultUpdateConfigurationStrategy.java        | 162 ++++++---------------
 .../nifi/minifi/validator/FlowValidator.java       |  98 +++++++++++++
 .../apache/nifi/minifi/StandardMiNiFiServer.java   |  33 +++--
 .../apache/nifi/headless/HeadlessNiFiServer.java   |   5 +
 6 files changed, 214 insertions(+), 136 deletions(-)

diff --git 
a/minifi/minifi-commons/minifi-commons-utils/src/main/java/org/apache/nifi/minifi/commons/utils/RetryUtil.java
 
b/minifi/minifi-commons/minifi-commons-utils/src/main/java/org/apache/nifi/minifi/commons/utils/RetryUtil.java
new file mode 100644
index 0000000000..74411aad6e
--- /dev/null
+++ 
b/minifi/minifi-commons/minifi-commons-utils/src/main/java/org/apache/nifi/minifi/commons/utils/RetryUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.utils;
+
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+public final class RetryUtil {
+
+    private RetryUtil() {
+        throw new UnsupportedOperationException();
+    }
+
+    public static  <T> Optional<T> retry(Supplier<T> input, Predicate<T> 
predicate, int maxRetries, int pauseDurationMillis) {
+        int retries = 0;
+        while (true) {
+            T t = input.get();
+            if (predicate.test(t)) {
+                return Optional.empty();
+            }
+            if (retries == maxRetries) {
+                return Optional.ofNullable(t);
+            }
+            retries++;
+            try {
+                Thread.sleep(pauseDurationMillis);
+            } catch (InterruptedException e) {
+            }
+        }
+    }
+}
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
index 37ed6dd95f..addc049ebf 100644
--- 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
@@ -145,6 +145,11 @@ limitations under the License.
             <artifactId>nifi-framework-core</artifactId>
             <version>2.0.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi.minifi</groupId>
+            <artifactId>minifi-commons-utils</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-python-framework-api</artifactId>
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
index d5ccac1e53..47504750be 100644
--- 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
@@ -17,15 +17,35 @@
 
 package org.apache.nifi.minifi.c2.command;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.emptySet;
+import static java.util.UUID.randomUUID;
+import static java.util.function.Predicate.not;
+import static 
org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.backup;
+import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.persist;
+import static 
org.apache.nifi.minifi.commons.util.FlowUpdateUtils.removeIfExists;
+import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.revert;
+import static org.apache.nifi.minifi.commons.utils.RetryUtil.retry;
+import static org.apache.nifi.minifi.validator.FlowValidator.validate;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
-import org.apache.nifi.components.AsyncLoadedProcessor;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.ComponentNode;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessorNode;
-import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.flow.VersionedDataflow;
 import org.apache.nifi.flow.VersionedConnection;
 import org.apache.nifi.flow.VersionedProcessGroup;
@@ -38,49 +58,12 @@ import org.apache.nifi.services.FlowService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Collections.emptySet;
-import static java.util.UUID.randomUUID;
-import static java.util.function.Predicate.not;
-import static 
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES;
-import static 
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.INITIALIZING_ENVIRONMENT;
-import static 
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE;
-import static org.apache.nifi.components.validation.ValidationStatus.INVALID;
-import static 
org.apache.nifi.components.validation.ValidationStatus.VALIDATING;
-import static 
org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
-import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
-import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.backup;
-import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.persist;
-import static 
org.apache.nifi.minifi.commons.util.FlowUpdateUtils.removeIfExists;
-import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.revert;
-
 public class DefaultUpdateConfigurationStrategy implements 
UpdateConfigurationStrategy {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
 
-    private static final Set<AsyncLoadedProcessor.LoadState> 
INITIALIZING_ASYNC_PROCESSOR_STATES =
-        Set.of(INITIALIZING_ENVIRONMENT, DOWNLOADING_DEPENDENCIES, 
LOADING_PROCESSOR_CODE);
-
-    private static int ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS = 
5000;
-    private static int ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES = 60;
-    private static int VALIDATION_RETRY_PAUSE_DURATION_MS = 1000;
-    private static int VALIDATION_MAX_RETRIES = 5;
-    private static int FLOW_DRAIN_RETRY_PAUSE_DURATION_MS = 1000;
-    private static int FLOW_DRAIN_MAX_RETRIES = 60;
+    private static final int FLOW_DRAIN_RETRY_PAUSE_DURATION_MS = 1000;
+    private static final int FLOW_DRAIN_MAX_RETRIES = 60;
 
     private final FlowController flowController;
     private final FlowService flowService;
@@ -115,9 +98,9 @@ public class DefaultUpdateConfigurationStrategy implements 
UpdateConfigurationSt
         Set<String> originalConnectionIds = emptySet();
         try {
             originalConnectionIds = 
findAllExistingConnections(flowController.getFlowManager().getRootGroup())
-                    .stream()
-                    .map(Connection::getIdentifier)
-                    .collect(Collectors.toSet());
+                .stream()
+                .map(Connection::getIdentifier)
+                .collect(Collectors.toSet());
             VersionedDataflow rawDataFlow = 
flowSerDeService.deserialize(rawFlow);
 
             VersionedDataflow propertyEncryptedRawDataFlow = 
flowPropertyEncryptor.encryptSensitiveProperties(rawDataFlow);
@@ -187,9 +170,9 @@ public class DefaultUpdateConfigurationStrategy implements 
UpdateConfigurationSt
         return rootProcessGroup -> {
             LOGGER.warn("Flow did not stop within graceful period. Force 
stopping flow and emptying non referenced queues");
             findAllExistingConnections(rootProcessGroup).stream()
-                    .filter(connection -> 
!proposedConnectionIds.contains(connection.getIdentifier()))
-                    .map(Connection::getFlowFileQueue)
-                    .forEach(queue -> 
queue.dropFlowFiles(randomUUID().toString(), randomUUID().toString()));
+                .filter(connection -> 
!proposedConnectionIds.contains(connection.getIdentifier()))
+                .map(Connection::getFlowFileQueue)
+                .forEach(queue -> queue.dropFlowFiles(randomUUID().toString(), 
randomUUID().toString()));
         };
     }
 
@@ -216,84 +199,21 @@ public class DefaultUpdateConfigurationStrategy 
implements UpdateConfigurationSt
         }
     }
 
-    private List<ValidationResult> validate(FlowManager flowManager) {
-        List<? extends ComponentNode> componentNodes = 
extractComponentNodes(flowManager);
-
-        retry(() -> initializingAsyncLoadingComponents(componentNodes), 
List::isEmpty,
-            ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES, 
ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS)
-            .ifPresent(components -> {
-                LOGGER.error("The following components are async loading 
components and are still initializing: {}", components);
-                throw new IllegalStateException("Maximum retry number exceeded 
while waiting for async loading components to be initialized");
-            });
-
-        retry(() -> componentsInValidatingState(componentNodes), 
List::isEmpty, VALIDATION_MAX_RETRIES, VALIDATION_RETRY_PAUSE_DURATION_MS)
-            .ifPresent(components -> {
-                LOGGER.error("The following components are still in VALIDATING 
state: {}", components);
-                throw new IllegalStateException("Maximum retry number exceeded 
while waiting for components to be validated");
-            });
-
-        return componentNodes.stream()
-            .map(ComponentNode::getValidationErrors)
-            .flatMap(Collection::stream)
-            .toList();
-    }
-
-    private List<? extends ComponentNode> extractComponentNodes(FlowManager 
flowManager) {
-        return Stream.of(
-                flowManager.getAllControllerServices(),
-                flowManager.getAllReportingTasks(),
-                Set.copyOf(flowManager.getRootGroup().findAllProcessors()))
-            .flatMap(Set::stream)
-            .toList();
-    }
-
-    private List<? extends ComponentNode> componentsInValidatingState(List<? 
extends ComponentNode> componentNodes) {
-        return componentNodes.stream()
-            .filter(componentNode -> componentNode.performValidation() == 
VALIDATING)
-            .toList();
-    }
-
-    private List<? extends ComponentNode> 
initializingAsyncLoadingComponents(List<? extends ComponentNode> 
componentNodes) {
-        return componentNodes.stream()
-            .filter(componentNode -> componentNode.performValidation() == 
INVALID)
-            .filter(componentNode -> componentNode.getComponent() instanceof 
AsyncLoadedProcessor asyncLoadedProcessor
-                && 
INITIALIZING_ASYNC_PROCESSOR_STATES.contains(asyncLoadedProcessor.getState()))
-            .toList();
-    }
-
-    private <T> Optional<T> retry(Supplier<T> input, Predicate<T> predicate, 
int maxRetries, int pauseDurationMillis) {
-        int retries = 0;
-        while (true) {
-            T t = input.get();
-            if (predicate.test(t)) {
-                return Optional.empty();
-            }
-            if (retries == maxRetries) {
-                return Optional.ofNullable(t);
-            }
-            retries++;
-            try {
-                Thread.sleep(pauseDurationMillis);
-            } catch (InterruptedException e) {
-            }
-        }
-    }
-
     private Set<String> findAllProposedConnectionIds(VersionedProcessGroup 
versionedProcessGroup) {
         return versionedProcessGroup == null
-                ? emptySet()
-                : Stream.concat(
-                    
versionedProcessGroup.getConnections().stream().map(VersionedConnection::getInstanceIdentifier),
-                    
versionedProcessGroup.getProcessGroups().stream().map(this::findAllProposedConnectionIds).flatMap(Set::stream)
-                ).collect(Collectors.toSet());
+            ? emptySet()
+            : Stream.concat(
+                
versionedProcessGroup.getConnections().stream().map(VersionedConnection::getInstanceIdentifier),
+                
versionedProcessGroup.getProcessGroups().stream().map(this::findAllProposedConnectionIds).flatMap(Set::stream)
+            ).collect(Collectors.toSet());
     }
 
     private Set<Connection> findAllExistingConnections(ProcessGroup 
processGroup) {
         return processGroup == null
-                ? emptySet()
-                : Stream.concat(
-                    processGroup.getConnections().stream(),
-                    
processGroup.getProcessGroups().stream().map(this::findAllExistingConnections).flatMap(Set::stream)
-                ).collect(Collectors.toSet());
+            ? emptySet()
+            : Stream.concat(
+                processGroup.getConnections().stream(),
+                
processGroup.getProcessGroups().stream().map(this::findAllExistingConnections).flatMap(Set::stream)
+            ).collect(Collectors.toSet());
     }
 }
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/validator/FlowValidator.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/validator/FlowValidator.java
new file mode 100644
index 0000000000..e479b57dc1
--- /dev/null
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/validator/FlowValidator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.validator;
+
+import static 
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES;
+import static 
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.INITIALIZING_ENVIRONMENT;
+import static 
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE;
+import static org.apache.nifi.components.validation.ValidationStatus.INVALID;
+import static 
org.apache.nifi.components.validation.ValidationStatus.VALIDATING;
+import static org.apache.nifi.minifi.commons.utils.RetryUtil.retry;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.nifi.components.AsyncLoadedProcessor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class FlowValidator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FlowValidator.class);
+
+    private static final Set<AsyncLoadedProcessor.LoadState> 
INITIALIZING_ASYNC_PROCESSOR_STATES =
+        Set.of(INITIALIZING_ENVIRONMENT, DOWNLOADING_DEPENDENCIES, 
LOADING_PROCESSOR_CODE);
+
+    private static final int 
ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS = 5000;
+    private static final int ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES = 60;
+    private static final int VALIDATION_RETRY_PAUSE_DURATION_MS = 1000;
+    private static final int VALIDATION_MAX_RETRIES = 5;
+
+    private FlowValidator() {
+        throw new UnsupportedOperationException();
+    }
+
+    public static List<ValidationResult> validate(FlowManager flowManager) {
+        List<? extends ComponentNode> componentNodes = 
extractComponentNodes(flowManager);
+
+        retry(() -> initializingAsyncLoadingComponents(componentNodes), 
List::isEmpty,
+            ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES, 
ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS)
+            .ifPresent(components -> {
+                LOGGER.error("The following components are async loading 
components and are still initializing: {}", components);
+                throw new IllegalStateException("Maximum retry number exceeded 
while waiting for async loading components to be initialized");
+            });
+
+        retry(() -> componentsInValidatingState(componentNodes), 
List::isEmpty, VALIDATION_MAX_RETRIES, VALIDATION_RETRY_PAUSE_DURATION_MS)
+            .ifPresent(components -> {
+                LOGGER.error("The following components are still in VALIDATING 
state: {}", components);
+                throw new IllegalStateException("Maximum retry number exceeded 
while waiting for components to be validated");
+            });
+
+        return componentNodes.stream()
+            .map(ComponentNode::getValidationErrors)
+            .flatMap(Collection::stream)
+            .toList();
+    }
+
+    private static List<? extends ComponentNode> 
extractComponentNodes(FlowManager flowManager) {
+        return Stream.of(
+                flowManager.getAllControllerServices(),
+                flowManager.getAllReportingTasks(),
+                Set.copyOf(flowManager.getRootGroup().findAllProcessors()))
+            .flatMap(Set::stream)
+            .toList();
+    }
+
+    private static List<? extends ComponentNode> 
componentsInValidatingState(List<? extends ComponentNode> componentNodes) {
+        return componentNodes.stream()
+            .filter(componentNode -> componentNode.performValidation() == 
VALIDATING)
+            .toList();
+    }
+
+    private static List<? extends ComponentNode> 
initializingAsyncLoadingComponents(List<? extends ComponentNode> 
componentNodes) {
+        return componentNodes.stream()
+            .filter(componentNode -> componentNode.performValidation() == 
INVALID)
+            .filter(componentNode -> componentNode.getComponent() instanceof 
AsyncLoadedProcessor asyncLoadedProcessor
+                && 
INITIALIZING_ASYNC_PROCESSOR_STATES.contains(asyncLoadedProcessor.getState()))
+            .toList();
+    }
+}
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
index 132be38e59..e00e933c59 100644
--- 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
@@ -19,11 +19,14 @@ package org.apache.nifi.minifi;
 
 import static java.util.Optional.ofNullable;
 import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.nifi.minifi.validator.FlowValidator.validate;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.List;
 import java.util.Optional;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.headless.HeadlessNiFiServer;
 import org.apache.nifi.minifi.bootstrap.BootstrapListener;
 import org.apache.nifi.minifi.c2.C2NifiClientService;
@@ -35,28 +38,14 @@ import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- *
- */
 public class StandardMiNiFiServer extends HeadlessNiFiServer implements 
MiNiFiServer {
 
     private static final Logger logger = 
LoggerFactory.getLogger(StandardMiNiFiServer.class);
-    public static final String BOOTSTRAP_PORT_PROPERTY = 
"nifi.bootstrap.listen.port";
+    private static final String BOOTSTRAP_PORT_PROPERTY = 
"nifi.bootstrap.listen.port";
 
     private BootstrapListener bootstrapListener;
-
-    /* A reference to the client service for handling*/
     private C2NifiClientService c2NifiClientService;
 
-
-    public StandardMiNiFiServer() {
-        super();
-    }
-
-    public FlowStatusReport getStatusReport(String requestString) throws 
StatusRequestException {
-        return StatusConfigReporter.getStatus(getFlowController(), 
requestString, logger);
-    }
-
     @Override
     public void start() {
         super.start();
@@ -67,6 +56,16 @@ public class StandardMiNiFiServer extends HeadlessNiFiServer 
implements MiNiFiSe
         startHeartbeat();
     }
 
+    @Override
+    protected void validateFlow() {
+        List<ValidationResult> validationErrors = 
validate(getFlowController().getFlowManager());
+        if (!validationErrors.isEmpty()) {
+            logger.error("Validation errors found when loading the flow: {}", 
validationErrors);
+            throw new IllegalStateException("Unable to start flow due to 
validation errors");
+        }
+        logger.info("Flow validated successfully");
+    }
+
     @Override
     public void stop(boolean reload) {
         super.stop();
@@ -86,6 +85,10 @@ public class StandardMiNiFiServer extends HeadlessNiFiServer 
implements MiNiFiSe
         }
     }
 
+    public FlowStatusReport getStatusReport(String requestString) throws 
StatusRequestException {
+        return StatusConfigReporter.getStatus(getFlowController(), 
requestString, logger);
+    }
+
     private void initC2() {
         if 
(Boolean.parseBoolean(getNiFiProperties().getProperty(MiNiFiProperties.C2_ENABLE.getKey(),
 MiNiFiProperties.C2_ENABLE.getDefaultValue()))) {
             NiFiProperties niFiProperties = getNiFiProperties();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
 
b/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
index f64a738358..66320bcbc0 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
@@ -169,6 +169,7 @@ public class HeadlessNiFiServer implements NiFiServer {
             flowService.start();
             flowService.load(null);
             flowController.onFlowInitialized(true);
+            validateFlow();
             FlowManager flowManager = flowController.getFlowManager();
             
flowManager.getGroup(flowManager.getRootGroupId()).startProcessing();
 
@@ -195,6 +196,10 @@ public class HeadlessNiFiServer implements NiFiServer {
         }
     }
 
+    protected void validateFlow() {
+        logger.info("Flow validation not implemented. Proceeding without 
validating the flow");
+    }
+
     private void startUpFailure(Throwable t) {
         System.err.println("Failed to start flow service: " + t.getMessage());
         System.err.println("Shutting down...");

Reply via email to