This is an automated email from the ASF dual-hosted git repository.
bejancsaba pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new dca7aac9b3 NIFI-13541 [MiNiFi] Validate flow during startup
dca7aac9b3 is described below
commit dca7aac9b3f3b9e79ed37572d17f69c89d295470
Author: Ferenc Kis <[email protected]>
AuthorDate: Thu Jul 11 12:35:07 2024 +0200
NIFI-13541 [MiNiFi] Validate flow during startup
Signed-off-by: Csaba Bejan <[email protected]>
This closes #9073.
---
.../nifi/minifi/commons/utils/RetryUtil.java | 47 ++++++
.../minifi-framework/minifi-framework-core/pom.xml | 5 +
.../DefaultUpdateConfigurationStrategy.java | 162 ++++++---------------
.../nifi/minifi/validator/FlowValidator.java | 98 +++++++++++++
.../apache/nifi/minifi/StandardMiNiFiServer.java | 33 +++--
.../apache/nifi/headless/HeadlessNiFiServer.java | 5 +
6 files changed, 214 insertions(+), 136 deletions(-)
diff --git
a/minifi/minifi-commons/minifi-commons-utils/src/main/java/org/apache/nifi/minifi/commons/utils/RetryUtil.java
b/minifi/minifi-commons/minifi-commons-utils/src/main/java/org/apache/nifi/minifi/commons/utils/RetryUtil.java
new file mode 100644
index 0000000000..74411aad6e
--- /dev/null
+++
b/minifi/minifi-commons/minifi-commons-utils/src/main/java/org/apache/nifi/minifi/commons/utils/RetryUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.utils;
+
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+public final class RetryUtil {
+
+ private RetryUtil() {
+ throw new UnsupportedOperationException();
+ }
+
+ public static <T> Optional<T> retry(Supplier<T> input, Predicate<T>
predicate, int maxRetries, int pauseDurationMillis) {
+ int retries = 0;
+ while (true) {
+ T t = input.get();
+ if (predicate.test(t)) {
+ return Optional.empty();
+ }
+ if (retries == maxRetries) {
+ return Optional.ofNullable(t);
+ }
+ retries++;
+ try {
+ Thread.sleep(pauseDurationMillis);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
index 37ed6dd95f..addc049ebf 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
@@ -145,6 +145,11 @@ limitations under the License.
<artifactId>nifi-framework-core</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <artifactId>minifi-commons-utils</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-python-framework-api</artifactId>
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
index d5ccac1e53..47504750be 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
@@ -17,15 +17,35 @@
package org.apache.nifi.minifi.c2.command;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.emptySet;
+import static java.util.UUID.randomUUID;
+import static java.util.function.Predicate.not;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.backup;
+import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.persist;
+import static
org.apache.nifi.minifi.commons.util.FlowUpdateUtils.removeIfExists;
+import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.revert;
+import static org.apache.nifi.minifi.commons.utils.RetryUtil.retry;
+import static org.apache.nifi.minifi.validator.FlowValidator.validate;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.commons.io.FilenameUtils;
import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
-import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
-import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedProcessGroup;
@@ -38,49 +58,12 @@ import org.apache.nifi.services.FlowService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Collections.emptySet;
-import static java.util.UUID.randomUUID;
-import static java.util.function.Predicate.not;
-import static
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES;
-import static
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.INITIALIZING_ENVIRONMENT;
-import static
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE;
-import static org.apache.nifi.components.validation.ValidationStatus.INVALID;
-import static
org.apache.nifi.components.validation.ValidationStatus.VALIDATING;
-import static
org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
-import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
-import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.backup;
-import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.persist;
-import static
org.apache.nifi.minifi.commons.util.FlowUpdateUtils.removeIfExists;
-import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.revert;
-
public class DefaultUpdateConfigurationStrategy implements
UpdateConfigurationStrategy {
private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
- private static final Set<AsyncLoadedProcessor.LoadState>
INITIALIZING_ASYNC_PROCESSOR_STATES =
- Set.of(INITIALIZING_ENVIRONMENT, DOWNLOADING_DEPENDENCIES,
LOADING_PROCESSOR_CODE);
-
- private static int ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS =
5000;
- private static int ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES = 60;
- private static int VALIDATION_RETRY_PAUSE_DURATION_MS = 1000;
- private static int VALIDATION_MAX_RETRIES = 5;
- private static int FLOW_DRAIN_RETRY_PAUSE_DURATION_MS = 1000;
- private static int FLOW_DRAIN_MAX_RETRIES = 60;
+ private static final int FLOW_DRAIN_RETRY_PAUSE_DURATION_MS = 1000;
+ private static final int FLOW_DRAIN_MAX_RETRIES = 60;
private final FlowController flowController;
private final FlowService flowService;
@@ -115,9 +98,9 @@ public class DefaultUpdateConfigurationStrategy implements
UpdateConfigurationSt
Set<String> originalConnectionIds = emptySet();
try {
originalConnectionIds =
findAllExistingConnections(flowController.getFlowManager().getRootGroup())
- .stream()
- .map(Connection::getIdentifier)
- .collect(Collectors.toSet());
+ .stream()
+ .map(Connection::getIdentifier)
+ .collect(Collectors.toSet());
VersionedDataflow rawDataFlow =
flowSerDeService.deserialize(rawFlow);
VersionedDataflow propertyEncryptedRawDataFlow =
flowPropertyEncryptor.encryptSensitiveProperties(rawDataFlow);
@@ -187,9 +170,9 @@ public class DefaultUpdateConfigurationStrategy implements
UpdateConfigurationSt
return rootProcessGroup -> {
LOGGER.warn("Flow did not stop within graceful period. Force
stopping flow and emptying non referenced queues");
findAllExistingConnections(rootProcessGroup).stream()
- .filter(connection ->
!proposedConnectionIds.contains(connection.getIdentifier()))
- .map(Connection::getFlowFileQueue)
- .forEach(queue ->
queue.dropFlowFiles(randomUUID().toString(), randomUUID().toString()));
+ .filter(connection ->
!proposedConnectionIds.contains(connection.getIdentifier()))
+ .map(Connection::getFlowFileQueue)
+ .forEach(queue -> queue.dropFlowFiles(randomUUID().toString(),
randomUUID().toString()));
};
}
@@ -216,84 +199,21 @@ public class DefaultUpdateConfigurationStrategy
implements UpdateConfigurationSt
}
}
- private List<ValidationResult> validate(FlowManager flowManager) {
- List<? extends ComponentNode> componentNodes =
extractComponentNodes(flowManager);
-
- retry(() -> initializingAsyncLoadingComponents(componentNodes),
List::isEmpty,
- ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES,
ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS)
- .ifPresent(components -> {
- LOGGER.error("The following components are async loading
components and are still initializing: {}", components);
- throw new IllegalStateException("Maximum retry number exceeded
while waiting for async loading components to be initialized");
- });
-
- retry(() -> componentsInValidatingState(componentNodes),
List::isEmpty, VALIDATION_MAX_RETRIES, VALIDATION_RETRY_PAUSE_DURATION_MS)
- .ifPresent(components -> {
- LOGGER.error("The following components are still in VALIDATING
state: {}", components);
- throw new IllegalStateException("Maximum retry number exceeded
while waiting for components to be validated");
- });
-
- return componentNodes.stream()
- .map(ComponentNode::getValidationErrors)
- .flatMap(Collection::stream)
- .toList();
- }
-
- private List<? extends ComponentNode> extractComponentNodes(FlowManager
flowManager) {
- return Stream.of(
- flowManager.getAllControllerServices(),
- flowManager.getAllReportingTasks(),
- Set.copyOf(flowManager.getRootGroup().findAllProcessors()))
- .flatMap(Set::stream)
- .toList();
- }
-
- private List<? extends ComponentNode> componentsInValidatingState(List<?
extends ComponentNode> componentNodes) {
- return componentNodes.stream()
- .filter(componentNode -> componentNode.performValidation() ==
VALIDATING)
- .toList();
- }
-
- private List<? extends ComponentNode>
initializingAsyncLoadingComponents(List<? extends ComponentNode>
componentNodes) {
- return componentNodes.stream()
- .filter(componentNode -> componentNode.performValidation() ==
INVALID)
- .filter(componentNode -> componentNode.getComponent() instanceof
AsyncLoadedProcessor asyncLoadedProcessor
- &&
INITIALIZING_ASYNC_PROCESSOR_STATES.contains(asyncLoadedProcessor.getState()))
- .toList();
- }
-
- private <T> Optional<T> retry(Supplier<T> input, Predicate<T> predicate,
int maxRetries, int pauseDurationMillis) {
- int retries = 0;
- while (true) {
- T t = input.get();
- if (predicate.test(t)) {
- return Optional.empty();
- }
- if (retries == maxRetries) {
- return Optional.ofNullable(t);
- }
- retries++;
- try {
- Thread.sleep(pauseDurationMillis);
- } catch (InterruptedException e) {
- }
- }
- }
-
private Set<String> findAllProposedConnectionIds(VersionedProcessGroup
versionedProcessGroup) {
return versionedProcessGroup == null
- ? emptySet()
- : Stream.concat(
-
versionedProcessGroup.getConnections().stream().map(VersionedConnection::getInstanceIdentifier),
-
versionedProcessGroup.getProcessGroups().stream().map(this::findAllProposedConnectionIds).flatMap(Set::stream)
- ).collect(Collectors.toSet());
+ ? emptySet()
+ : Stream.concat(
+
versionedProcessGroup.getConnections().stream().map(VersionedConnection::getInstanceIdentifier),
+
versionedProcessGroup.getProcessGroups().stream().map(this::findAllProposedConnectionIds).flatMap(Set::stream)
+ ).collect(Collectors.toSet());
}
private Set<Connection> findAllExistingConnections(ProcessGroup
processGroup) {
return processGroup == null
- ? emptySet()
- : Stream.concat(
- processGroup.getConnections().stream(),
-
processGroup.getProcessGroups().stream().map(this::findAllExistingConnections).flatMap(Set::stream)
- ).collect(Collectors.toSet());
+ ? emptySet()
+ : Stream.concat(
+ processGroup.getConnections().stream(),
+
processGroup.getProcessGroups().stream().map(this::findAllExistingConnections).flatMap(Set::stream)
+ ).collect(Collectors.toSet());
}
}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/validator/FlowValidator.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/validator/FlowValidator.java
new file mode 100644
index 0000000000..e479b57dc1
--- /dev/null
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/validator/FlowValidator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.validator;
+
+import static
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES;
+import static
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.INITIALIZING_ENVIRONMENT;
+import static
org.apache.nifi.components.AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE;
+import static org.apache.nifi.components.validation.ValidationStatus.INVALID;
+import static
org.apache.nifi.components.validation.ValidationStatus.VALIDATING;
+import static org.apache.nifi.minifi.commons.utils.RetryUtil.retry;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.nifi.components.AsyncLoadedProcessor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class FlowValidator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FlowValidator.class);
+
+ private static final Set<AsyncLoadedProcessor.LoadState>
INITIALIZING_ASYNC_PROCESSOR_STATES =
+ Set.of(INITIALIZING_ENVIRONMENT, DOWNLOADING_DEPENDENCIES,
LOADING_PROCESSOR_CODE);
+
+ private static final int
ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS = 5000;
+ private static final int ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES = 60;
+ private static final int VALIDATION_RETRY_PAUSE_DURATION_MS = 1000;
+ private static final int VALIDATION_MAX_RETRIES = 5;
+
+ private FlowValidator() {
+ throw new UnsupportedOperationException();
+ }
+
+ public static List<ValidationResult> validate(FlowManager flowManager) {
+ List<? extends ComponentNode> componentNodes =
extractComponentNodes(flowManager);
+
+ retry(() -> initializingAsyncLoadingComponents(componentNodes),
List::isEmpty,
+ ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES,
ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS)
+ .ifPresent(components -> {
+ LOGGER.error("The following components are async loading
components and are still initializing: {}", components);
+ throw new IllegalStateException("Maximum retry number exceeded
while waiting for async loading components to be initialized");
+ });
+
+ retry(() -> componentsInValidatingState(componentNodes),
List::isEmpty, VALIDATION_MAX_RETRIES, VALIDATION_RETRY_PAUSE_DURATION_MS)
+ .ifPresent(components -> {
+ LOGGER.error("The following components are still in VALIDATING
state: {}", components);
+ throw new IllegalStateException("Maximum retry number exceeded
while waiting for components to be validated");
+ });
+
+ return componentNodes.stream()
+ .map(ComponentNode::getValidationErrors)
+ .flatMap(Collection::stream)
+ .toList();
+ }
+
+ private static List<? extends ComponentNode>
extractComponentNodes(FlowManager flowManager) {
+ return Stream.of(
+ flowManager.getAllControllerServices(),
+ flowManager.getAllReportingTasks(),
+ Set.copyOf(flowManager.getRootGroup().findAllProcessors()))
+ .flatMap(Set::stream)
+ .toList();
+ }
+
+ private static List<? extends ComponentNode>
componentsInValidatingState(List<? extends ComponentNode> componentNodes) {
+ return componentNodes.stream()
+ .filter(componentNode -> componentNode.performValidation() ==
VALIDATING)
+ .toList();
+ }
+
+ private static List<? extends ComponentNode>
initializingAsyncLoadingComponents(List<? extends ComponentNode>
componentNodes) {
+ return componentNodes.stream()
+ .filter(componentNode -> componentNode.performValidation() ==
INVALID)
+ .filter(componentNode -> componentNode.getComponent() instanceof
AsyncLoadedProcessor asyncLoadedProcessor
+ &&
INITIALIZING_ASYNC_PROCESSOR_STATES.contains(asyncLoadedProcessor.getState()))
+ .toList();
+ }
+}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
index 132be38e59..e00e933c59 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
@@ -19,11 +19,14 @@ package org.apache.nifi.minifi;
import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.nifi.minifi.validator.FlowValidator.validate;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.headless.HeadlessNiFiServer;
import org.apache.nifi.minifi.bootstrap.BootstrapListener;
import org.apache.nifi.minifi.c2.C2NifiClientService;
@@ -35,28 +38,14 @@ import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- *
- */
public class StandardMiNiFiServer extends HeadlessNiFiServer implements
MiNiFiServer {
private static final Logger logger =
LoggerFactory.getLogger(StandardMiNiFiServer.class);
- public static final String BOOTSTRAP_PORT_PROPERTY =
"nifi.bootstrap.listen.port";
+ private static final String BOOTSTRAP_PORT_PROPERTY =
"nifi.bootstrap.listen.port";
private BootstrapListener bootstrapListener;
-
- /* A reference to the client service for handling*/
private C2NifiClientService c2NifiClientService;
-
- public StandardMiNiFiServer() {
- super();
- }
-
- public FlowStatusReport getStatusReport(String requestString) throws
StatusRequestException {
- return StatusConfigReporter.getStatus(getFlowController(),
requestString, logger);
- }
-
@Override
public void start() {
super.start();
@@ -67,6 +56,16 @@ public class StandardMiNiFiServer extends HeadlessNiFiServer
implements MiNiFiSe
startHeartbeat();
}
+ @Override
+ protected void validateFlow() {
+ List<ValidationResult> validationErrors =
validate(getFlowController().getFlowManager());
+ if (!validationErrors.isEmpty()) {
+ logger.error("Validation errors found when loading the flow: {}",
validationErrors);
+ throw new IllegalStateException("Unable to start flow due to
validation errors");
+ }
+ logger.info("Flow validated successfully");
+ }
+
@Override
public void stop(boolean reload) {
super.stop();
@@ -86,6 +85,10 @@ public class StandardMiNiFiServer extends HeadlessNiFiServer
implements MiNiFiSe
}
}
+ public FlowStatusReport getStatusReport(String requestString) throws
StatusRequestException {
+ return StatusConfigReporter.getStatus(getFlowController(),
requestString, logger);
+ }
+
private void initC2() {
if
(Boolean.parseBoolean(getNiFiProperties().getProperty(MiNiFiProperties.C2_ENABLE.getKey(),
MiNiFiProperties.C2_ENABLE.getDefaultValue()))) {
NiFiProperties niFiProperties = getNiFiProperties();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
b/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
index f64a738358..66320bcbc0 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
@@ -169,6 +169,7 @@ public class HeadlessNiFiServer implements NiFiServer {
flowService.start();
flowService.load(null);
flowController.onFlowInitialized(true);
+ validateFlow();
FlowManager flowManager = flowController.getFlowManager();
flowManager.getGroup(flowManager.getRootGroupId()).startProcessing();
@@ -195,6 +196,10 @@ public class HeadlessNiFiServer implements NiFiServer {
}
}
+ protected void validateFlow() {
+ logger.info("Flow validation not implemented. Proceeding without
validating the flow");
+ }
+
private void startUpFailure(Throwable t) {
System.err.println("Failed to start flow service: " + t.getMessage());
System.err.println("Shutting down...");