This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f7df326 NIFI-9183: Add a command-line option to save status history
f7df326 is described below
commit f7df3265215cfe74769b908db1d3298c7529ba00
Author: Lehel Boér <[email protected]>
AuthorDate: Wed Aug 25 18:08:02 2021 +0200
NIFI-9183: Add a command-line option to save status history
Fixed typo error.
Fixed error.
Fixed review items.
Fix contrib-check. Added missing test file to excludes.
Fix review items.
Fix review items: error handling, input validation, added more unit tests.
Improved status history repository creation in HeadlessNiFiServer.
---
.../java/org/apache/nifi/bootstrap/RunNiFi.java | 62 +++++++++++++++-
.../nifi/bootstrap/util/DumpFileValidator.java | 59 ++++++++++++++++
.../status/history/StatusHistoryDump.java | 37 ++++------
.../status/history/StatusHistoryDumpFactory.java | 34 +++------
.../nifi/documentation/example/NiFiServerStub.java | 8 +++
.../apache/nifi/cluster/integration/Cluster.java | 6 +-
.../org/apache/nifi/cluster/integration/Node.java | 13 ++--
.../org/apache/nifi/controller/FlowController.java | 35 +++------
.../status/history/JsonNodeStatusHistoryDump.java | 43 ++++++++++++
.../history/JsonNodeStatusHistoryDumpFactory.java | 46 ++++++++++++
.../nifi/spring/FlowControllerFactoryBean.java | 12 +++-
.../spring/StatusHistoryRepositoryFactoryBean.java | 75 ++++++++++++++++++++
.../src/main/resources/nifi-context.xml | 10 +++
.../nifi/controller/StandardFlowServiceTest.java | 5 +-
.../nifi/controller/StandardProcessorNodeIT.java | 3 +-
.../apache/nifi/controller/TestFlowController.java | 69 +++++++++---------
.../reporting/TestStandardReportingContext.java | 6 +-
.../scheduling/ProcessorLifecycleIT.java | 15 ++--
.../serialization/StandardFlowSerializerTest.java | 3 +-
.../JsonNodeStatusHistoryDumpFactoryTest.java | 82 ++++++++++++++++++++++
.../nifi/integration/FrameworkIntegrationTest.java | 10 ++-
.../apache/nifi/headless/HeadlessNiFiServer.java | 17 ++++-
.../nifi-resources/src/main/resources/bin/nifi.sh | 4 +-
.../java/org/apache/nifi/BootstrapListener.java | 19 ++++-
.../org/apache/nifi/web/server/JettyServer.java | 20 ++++--
.../nifi-standard-processors/pom.xml | 1 +
.../src/main/java/org/apache/nifi/NiFiServer.java | 3 +
27 files changed, 561 insertions(+), 136 deletions(-)
diff --git
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
index 7d3ecc8..8abc83c 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
@@ -18,6 +18,7 @@ package org.apache.nifi.bootstrap;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.bootstrap.notification.NotificationType;
+import org.apache.nifi.bootstrap.util.DumpFileValidator;
import org.apache.nifi.bootstrap.util.OSUtils;
import org.apache.nifi.bootstrap.util.SecureNiFiConfigUtil;
import org.apache.nifi.util.file.FileUtils;
@@ -44,6 +45,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
+import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
@@ -90,6 +92,7 @@ public class RunNiFi {
public static final String DEFAULT_JAVA_CMD = "java";
public static final String DEFAULT_PID_DIR = "bin";
public static final String DEFAULT_LOG_DIR = "./logs";
+ public static final String DEFAULT_STATUS_HISTORY_DAYS = "1";
public static final String GRACEFUL_SHUTDOWN_PROP =
"graceful.shutdown.seconds";
public static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20";
@@ -119,9 +122,12 @@ public class RunNiFi {
public static final String DUMP_CMD = "DUMP";
public static final String DIAGNOSTICS_CMD = "DIAGNOSTICS";
public static final String IS_LOADED_CMD = "IS_LOADED";
+ public static final String STATUS_HISTORY_CMD = "STATUS_HISTORY";
private static final int UNINITIALIZED_CC_PORT = -1;
+ private static final int INVALID_CMD_ARGUMENT = -1;
+
private volatile boolean autoRestartNiFi = true;
private volatile int ccPort = UNINITIALIZED_CC_PORT;
private volatile long nifiPid = -1L;
@@ -175,7 +181,9 @@ public class RunNiFi {
System.out.println("Status : Determine if there is a running instance
of Apache NiFi");
System.out.println("Dump : Write a Thread Dump to the file specified
by [options], or to the log if no file is given");
System.out.println("Diagnostics : Write diagnostic information to the
file specified by [options], or to the log if no file is given. The --verbose
flag may be provided as an option before " +
- "the filename, which may result in additional diagnostic
information being written.");
+ "the filename, which may result in additional diagnostic
information being written.");
+ System.out.println("Status-history : Save the status history to the
file specified by [options]. The expected command parameters are: " +
+ "status-history <number of days> <dumpFile>. The <number of
days> parameter is optional and defaults to 1 day.");
System.out.println("Run : Start a new instance of Apache NiFi and
monitor the Process, restarting if the instance dies");
System.out.println();
}
@@ -192,6 +200,7 @@ public class RunNiFi {
File dumpFile = null;
boolean verbose = false;
+ String statusHistoryDays = null;
final String cmd = args[0];
if (cmd.equalsIgnoreCase("dump")) {
@@ -216,6 +225,41 @@ public class RunNiFi {
dumpFile = null;
verbose = false;
}
+ } else if (cmd.equalsIgnoreCase("status-history")) {
+ if (args.length < 2) {
+ System.err.printf("Wrong number of arguments: %d instead of 1
or 2, the command parameters are: " +
+ "status-history <number of days> <dumpFile>%n", 0);
+ System.exit(INVALID_CMD_ARGUMENT);
+ }
+ if (args.length == 3) {
+ statusHistoryDays = args[1];
+ try {
+ final int numberOfDays =
Integer.parseInt(statusHistoryDays);
+ if (numberOfDays < 1) {
+ System.err.println("The <number of days> parameter
must be positive and greater than zero. The command parameters are:" +
+ " status-history <number of days> <dumpFile>");
+ System.exit(INVALID_CMD_ARGUMENT);
+ }
+ } catch (NumberFormatException e) {
+ System.err.println("The <number of days> parameter value
is not a number. The command parameters are: status-history <number of days>
<dumpFile>");
+ System.exit(INVALID_CMD_ARGUMENT);
+ }
+ try {
+ Paths.get(args[2]);
+ } catch (InvalidPathException e) {
+ System.err.println("Invalid filename. The command
parameters are: status-history <number of days> <dumpFile>");
+ System.exit(INVALID_CMD_ARGUMENT);
+ }
+ dumpFile = new File(args[2]);
+ } else {
+ final boolean isValid = DumpFileValidator.validate(args[1]);
+ if (isValid) {
+ statusHistoryDays = DEFAULT_STATUS_HISTORY_DAYS;
+ dumpFile = new File(args[1]);
+ } else {
+ System.exit(INVALID_CMD_ARGUMENT);
+ }
+ }
}
switch (cmd.toLowerCase()) {
@@ -227,6 +271,7 @@ public class RunNiFi {
case "is_loaded":
case "dump":
case "diagnostics":
+ case "status-history":
case "restart":
case "env":
break;
@@ -272,6 +317,9 @@ public class RunNiFi {
case "diagnostics":
runNiFi.diagnostics(dumpFile, verbose);
break;
+ case "status-history":
+ runNiFi.statusHistory(dumpFile, statusHistoryDays);
+ break;
case "env":
runNiFi.env();
break;
@@ -729,6 +777,17 @@ public class RunNiFi {
makeRequest(DUMP_CMD, null, dumpFile, "thread dump");
}
+ /**
+ * Writes NiFi status history information to the given file.
+ *
+ * @param dumpFile the file to write the dump content to
+ * @throws IOException if any issues occur while writing the dump file
+ */
+ public void statusHistory(final File dumpFile, final String days) throws
IOException {
+ // Due to input validation, the dumpFile cannot currently be null in
this scenario.
+ makeRequest(STATUS_HISTORY_CMD, days, dumpFile, "status history
information");
+ }
+
private boolean isNiFiFullyLoaded() throws IOException,
NiFiNotRunningException {
final Logger logger = defaultLogger;
final Integer port = getCurrentPort(logger);
@@ -752,6 +811,7 @@ public class RunNiFi {
final Logger logger = defaultLogger; // dump to bootstrap log file
by default
final Integer port = getCurrentPort(logger);
if (port == null) {
+ cmdLogger.info("Apache NiFi is not currently running");
logger.info("Apache NiFi is not currently running");
return;
}
diff --git
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/DumpFileValidator.java
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/DumpFileValidator.java
new file mode 100644
index 0000000..8eaef04
--- /dev/null
+++
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/DumpFileValidator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.bootstrap.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.InvalidPathException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public final class DumpFileValidator {
+
+ private static final Logger logger =
LoggerFactory.getLogger(DumpFileValidator.class);
+
+ private DumpFileValidator() {
+ }
+
+ public static boolean validate(final String filePath) {
+ try {
+ final Path path = Paths.get(filePath);
+ return checkFileCanBeCreated(path);
+ } catch (InvalidPathException e) {
+ System.err.println("Invalid filename. The command parameters are:
status-history <number of days> <dumpFile>");
+ return false;
+ }
+ }
+
+ private static boolean checkFileCanBeCreated(final Path path) {
+ try (final FileOutputStream outputStream = new
FileOutputStream(path.toString());
+ final Closeable onClose = () -> Files.delete(path)) {
+ } catch (FileNotFoundException e) {
+ System.err.println("Invalid filename or there's no write
permission to the currently selected file path.");
+ return false;
+ } catch (IOException e) {
+ logger.error("Could not delete file while validating file path.");
+ }
+ return true;
+ }
+}
diff --git a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryDump.java
similarity index 56%
copy from nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
copy to
nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryDump.java
index cc87079..c218115 100644
--- a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryDump.java
@@ -14,30 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi;
+package org.apache.nifi.controller.status.history;
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.controller.DecommissionTask;
-import org.apache.nifi.diagnostics.DiagnosticsFactory;
-import org.apache.nifi.nar.ExtensionMapping;
-import org.apache.nifi.util.NiFiProperties;
-
-import java.util.Set;
+import java.io.IOException;
+import java.io.OutputStream;
/**
- *
+ * Container for status history data which is capable to write it in an
implementation dependent format.
*/
-public interface NiFiServer {
-
- void start();
-
- void initialize(NiFiProperties properties, Bundle systemBundle,
Set<Bundle> bundles, ExtensionMapping extensionMapping);
-
- void stop();
-
- DiagnosticsFactory getDiagnosticsFactory();
-
- DiagnosticsFactory getThreadDumpFactory();
-
- DecommissionTask getDecommissionTask();
-}
+public interface StatusHistoryDump {
+
+ /**
+ * Writes a status history dump to an output stream.
+ *
+ * @param out the output stream
+ * @throws IOException if cannot serialize
+ */
+ void writeTo(final OutputStream out) throws IOException;
+}
\ No newline at end of file
diff --git a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryDumpFactory.java
similarity index 56%
copy from nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
copy to
nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryDumpFactory.java
index cc87079..83f8076 100644
--- a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryDumpFactory.java
@@ -14,30 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi;
-
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.controller.DecommissionTask;
-import org.apache.nifi.diagnostics.DiagnosticsFactory;
-import org.apache.nifi.nar.ExtensionMapping;
-import org.apache.nifi.util.NiFiProperties;
-
-import java.util.Set;
+package org.apache.nifi.controller.status.history;
/**
- *
+ * Factory class to create StatusHistoryDump instance.
*/
-public interface NiFiServer {
-
- void start();
-
- void initialize(NiFiProperties properties, Bundle systemBundle,
Set<Bundle> bundles, ExtensionMapping extensionMapping);
-
- void stop();
-
- DiagnosticsFactory getDiagnosticsFactory();
-
- DiagnosticsFactory getThreadDumpFactory();
-
- DecommissionTask getDecommissionTask();
+public interface StatusHistoryDumpFactory {
+
+ /**
+ * Creates a status history dump object.
+ *
+ * @param days number of backdating days
+ * @return the status history dump
+ */
+ StatusHistoryDump create(final int days);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java
index d82bec9..2a1964a 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java
@@ -19,6 +19,7 @@ package org.apache.nifi.documentation.example;
import org.apache.nifi.NiFiServer;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.controller.DecommissionTask;
+import org.apache.nifi.controller.status.history.StatusHistoryDumpFactory;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
import org.apache.nifi.nar.ExtensionMapping;
import org.apache.nifi.util.NiFiProperties;
@@ -60,4 +61,11 @@ public class NiFiServerStub implements NiFiServer {
public DecommissionTask getDecommissionTask() {
return null;
}
+
+ @Override
+ public StatusHistoryDumpFactory getStatusHistoryDumpFactory() {
+ return null;
+ }
+
+
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
index 00b76f1..7260927 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
@@ -25,6 +25,8 @@ import org.apache.curator.test.TestingServer;
import org.apache.nifi.cluster.coordination.flow.FlowElection;
import org.apache.nifi.cluster.coordination.flow.PopularVoteFlowElection;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
+import
org.apache.nifi.controller.status.history.VolatileComponentStatusRepository;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.encrypt.PropertyEncryptorFactory;
import org.apache.nifi.encrypt.SensitiveValueEncoder;
@@ -141,8 +143,8 @@ public class Cluster {
final ExtensionDiscoveringManager extensionManager = new
StandardExtensionDiscoveringManager();
final FingerprintFactory fingerprintFactory = new
FingerprintFactory(encryptor, extensionManager, sensitiveValueEncoder);
final FlowElection flowElection = new
PopularVoteFlowElection(flowElectionTimeoutMillis, TimeUnit.MILLISECONDS,
flowElectionMaxNodes, fingerprintFactory);
-
- final Node node = new Node(nifiProperties, extensionManager,
flowElection);
+ final StatusHistoryRepository statusHistoryRepository = new
VolatileComponentStatusRepository(nifiProperties);
+ final Node node = new Node(nifiProperties, extensionManager,
flowElection, statusHistoryRepository);
node.start();
nodes.add(node);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
index 8cec815..5e239b2 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@ -46,6 +46,7 @@ import org.apache.nifi.controller.StandardFlowService;
import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.encrypt.PropertyEncryptorFactory;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.EventReporter;
@@ -84,6 +85,7 @@ public class Node {
private final List<ReportedEvent> reportedEvents =
Collections.synchronizedList(new ArrayList<ReportedEvent>());
private final RevisionManager revisionManager;
private final FlowElection flowElection;
+ private final StatusHistoryRepository statusHistoryRepository;
private NodeClusterCoordinator clusterCoordinator;
private NodeProtocolSender protocolSender;
@@ -98,11 +100,13 @@ public class Node {
private ScheduledExecutorService executor = new FlowEngine(8, "Node
tasks", true);
- public Node(final NiFiProperties properties, final
ExtensionDiscoveringManager extensionManager, final FlowElection flowElection) {
- this(createNodeId(), properties, extensionManager, flowElection);
+ public Node(final NiFiProperties properties, final
ExtensionDiscoveringManager extensionManager, final FlowElection flowElection,
+ final StatusHistoryRepository statusHistoryRepository) {
+ this(createNodeId(), properties, extensionManager, flowElection,
statusHistoryRepository);
}
- public Node(final NodeIdentifier nodeId, final NiFiProperties properties,
final ExtensionDiscoveringManager extensionManager, final FlowElection
flowElection) {
+ public Node(final NodeIdentifier nodeId, final NiFiProperties properties,
final ExtensionDiscoveringManager extensionManager,
+ final FlowElection flowElection, final StatusHistoryRepository
statusHistoryRepository) {
this.nodeId = nodeId;
this.nodeProperties = new NiFiProperties() {
@Override
@@ -137,6 +141,7 @@ public class Node {
electionManager = new CuratorLeaderElectionManager(4, nodeProperties);
this.flowElection = flowElection;
+ this.statusHistoryRepository = statusHistoryRepository;
}
@@ -156,7 +161,7 @@ public class Node {
flowController =
FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class),
nodeProperties,
null, null,
PropertyEncryptorFactory.getPropertyEncryptor(nodeProperties), protocolSender,
Mockito.mock(BulletinRepository.class), clusterCoordinator,
heartbeatMonitor, electionManager,
VariableRegistry.EMPTY_REGISTRY, Mockito.mock(FlowRegistryClient.class),
extensionManager,
- revisionManager);
+ revisionManager, statusHistoryRepository);
try {
flowController.initializeFlow();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 751b6aa..b409103 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -240,7 +240,6 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
public static final String DEFAULT_CONTENT_REPO_IMPLEMENTATION =
"org.apache.nifi.controller.repository.FileSystemRepository";
public static final String DEFAULT_PROVENANCE_REPO_IMPLEMENTATION =
"org.apache.nifi.provenance.VolatileProvenanceRepository";
public static final String DEFAULT_SWAP_MANAGER_IMPLEMENTATION =
"org.apache.nifi.controller.FileSystemSwapManager";
- public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION =
"org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
private static final String ENCRYPTED_PROVENANCE_REPO_IMPLEMENTATION =
"org.apache.nifi.provenance.EncryptedWriteAheadProvenanceRepository";
private static final String ENCRYPTED_CONTENT_REPO_IMPLEMENTATION =
"org.apache.nifi.controller.repository.crypto.EncryptedFileSystemRepository";
@@ -393,7 +392,8 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
final BulletinRepository bulletinRepo,
final VariableRegistry variableRegistry,
final FlowRegistryClient flowRegistryClient,
- final ExtensionManager extensionManager) {
+ final ExtensionManager extensionManager,
+ final StatusHistoryRepository statusHistoryRepository) {
return new FlowController(
flowFileEventRepo,
@@ -410,7 +410,8 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
/* variable registry */ variableRegistry,
flowRegistryClient,
extensionManager,
- null);
+ null,
+ statusHistoryRepository);
}
public static FlowController createClusteredInstance(
@@ -427,7 +428,8 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
final VariableRegistry variableRegistry,
final FlowRegistryClient flowRegistryClient,
final ExtensionManager extensionManager,
- final RevisionManager revisionManager) {
+ final RevisionManager revisionManager,
+ final StatusHistoryRepository statusHistoryRepository) {
final FlowController flowController = new FlowController(
flowFileEventRepo,
@@ -444,7 +446,8 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
variableRegistry,
flowRegistryClient,
extensionManager,
- revisionManager);
+ revisionManager,
+ statusHistoryRepository);
return flowController;
}
@@ -465,7 +468,8 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
final VariableRegistry variableRegistry,
final FlowRegistryClient flowRegistryClient,
final ExtensionManager extensionManager,
- final RevisionManager revisionManager) {
+ final RevisionManager revisionManager,
+ final StatusHistoryRepository statusHistoryRepository) {
maxTimerDrivenThreads = new AtomicInteger(10);
maxEventDrivenThreads = new AtomicInteger(1);
@@ -481,6 +485,7 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
this.configuredForClustering = configuredForClustering;
this.flowRegistryClient = flowRegistryClient;
this.revisionManager = revisionManager;
+ this.statusHistoryRepository = statusHistoryRepository;
try {
// Form the container object from the properties
@@ -638,8 +643,6 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
zooKeeperStateServer = null;
}
- statusHistoryRepository = createStatusHistoryRepository();
-
final boolean analyticsEnabled =
Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.ANALYTICS_PREDICTION_ENABLED,
NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_ENABLED));
if (analyticsEnabled) {
@@ -1185,22 +1188,6 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
}
}
- private StatusHistoryRepository createStatusHistoryRepository() {
- final String implementationClassName =
nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION,
DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
- if (implementationClassName == null) {
- throw new RuntimeException("Cannot create Status History
Repository because the NiFi Properties is missing the following property: "
- +
NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
- }
-
- try {
- final StatusHistoryRepository repository =
NarThreadContextClassLoader.createInstance(extensionManager,
implementationClassName, StatusHistoryRepository.class, nifiProperties);
- repository.start();
- return repository;
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
- }
-
public KerberosConfig createKerberosConfig(final NiFiProperties
nifiProperties) {
final String principal = nifiProperties.getKerberosServicePrincipal();
final String keytabLocation =
nifiProperties.getKerberosServiceKeytabLocation();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDump.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDump.java
new file mode 100644
index 0000000..7215a03
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDump.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import com.fasterxml.jackson.core.util.DefaultIndenter;
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+final class JsonNodeStatusHistoryDump implements StatusHistoryDump {
+
+ private final StatusHistory nodeStatusHistory;
+
+ JsonNodeStatusHistoryDump(final StatusHistory nodeStatusHistory) {
+ this.nodeStatusHistory = nodeStatusHistory;
+ }
+
+ @Override
+ public void writeTo(final OutputStream out) throws IOException {
+ final ObjectMapper objectMapper = new ObjectMapper();
+ final DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter();
+
prettyPrinter.indentArraysWith(DefaultIndenter.SYSTEM_LINEFEED_INSTANCE);
+ final StatusHistoryDTO statusHistoryDTO =
StatusHistoryUtil.createStatusHistoryDTO(nodeStatusHistory);
+ objectMapper.writer(prettyPrinter).writeValue(out, statusHistoryDTO);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDumpFactory.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDumpFactory.java
new file mode 100644
index 0000000..fac963a
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDumpFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import com.google.common.base.Preconditions;
+
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.Date;
+
+public class JsonNodeStatusHistoryDumpFactory implements
StatusHistoryDumpFactory {
+
+ private StatusHistoryRepository statusHistoryRepository;
+
+ @Override
+ public StatusHistoryDump create(int days) {
+ Preconditions.checkArgument(days > 0, String.format("The number of
days shall be greater than 0. The current value is %s.", days));
+ final LocalDateTime endOfToday =
LocalDateTime.now().with(LocalTime.MAX);
+ final LocalDateTime startOfDaysBefore =
endOfToday.minusDays(days).with(LocalTime.MIN);
+
+ final Date endOfTodayDate =
Date.from(endOfToday.atZone(ZoneId.systemDefault()).toInstant());
+ final Date startOfDaysBeforeDate =
Date.from(startOfDaysBefore.atZone(ZoneId.systemDefault()).toInstant());
+
+ final StatusHistory nodeStatusHistory =
statusHistoryRepository.getNodeStatusHistory(startOfDaysBeforeDate,
endOfTodayDate);
+ return new JsonNodeStatusHistoryDump(nodeStatusHistory);
+ }
+
+ public void setStatusHistoryRepository(StatusHistoryRepository
statusHistoryRepository) {
+ this.statusHistoryRepository = statusHistoryRepository;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
index c373b23..6e005a5 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
@@ -24,6 +24,7 @@ import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.registry.VariableRegistry;
@@ -55,6 +56,7 @@ public class FlowControllerFactoryBean implements
FactoryBean, ApplicationContex
private FlowRegistryClient flowRegistryClient;
private ExtensionManager extensionManager;
private RevisionManager revisionManager;
+ private StatusHistoryRepository statusHistoryRepository;
@Override
public Object getObject() throws Exception {
@@ -78,7 +80,8 @@ public class FlowControllerFactoryBean implements
FactoryBean, ApplicationContex
variableRegistry,
flowRegistryClient,
extensionManager,
- revisionManager);
+ revisionManager,
+ statusHistoryRepository);
} else {
flowController = FlowController.createStandaloneInstance(
flowFileEventRepository,
@@ -89,7 +92,8 @@ public class FlowControllerFactoryBean implements
FactoryBean, ApplicationContex
bulletinRepository,
variableRegistry,
flowRegistryClient,
- extensionManager);
+ extensionManager,
+ statusHistoryRepository);
}
}
@@ -157,4 +161,8 @@ public class FlowControllerFactoryBean implements
FactoryBean, ApplicationContex
public void setRevisionManager(final RevisionManager revisionManager) {
this.revisionManager = revisionManager;
}
+
+ public void setStatusHistoryRepository(StatusHistoryRepository
statusHistoryRepository) {
+ this.statusHistoryRepository = statusHistoryRepository;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/StatusHistoryRepositoryFactoryBean.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/StatusHistoryRepositoryFactoryBean.java
new file mode 100644
index 0000000..79e0deb
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/StatusHistoryRepositoryFactoryBean.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.spring;
+
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarThreadContextClassLoader;
+import org.apache.nifi.util.NiFiProperties;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.BeanCreationException;
+import org.springframework.beans.factory.FactoryBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+/**
+ * Factory bean for creating a singleton StatusHistoryRepository instance.
+ */
+public class StatusHistoryRepositoryFactoryBean implements
FactoryBean<StatusHistoryRepository>, ApplicationContextAware {
+
+ private static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION =
"org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
+
+ private ApplicationContext applicationContext;
+ private NiFiProperties nifiProperties;
+ private ExtensionManager extensionManager;
+ private StatusHistoryRepository statusHistoryRepository;
+
+ @Override
+ public StatusHistoryRepository getObject() throws Exception {
+ final String implementationClassName =
nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION,
DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
+ if (implementationClassName == null) {
+ throw new BeanCreationException("Cannot create Status History
Repository because the NiFi Properties is missing the following property: "
+ +
NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+ }
+
+ try {
+ statusHistoryRepository =
NarThreadContextClassLoader.createInstance(extensionManager,
implementationClassName, StatusHistoryRepository.class, nifiProperties);
+ statusHistoryRepository.start();
+ return statusHistoryRepository;
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Class<?> getObjectType() {
+ return StatusHistoryRepository.class;
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+ public void setNifiProperties(NiFiProperties nifiProperties) {
+ this.nifiProperties = nifiProperties;
+ }
+
+ public void setExtensionManager(ExtensionManager extensionManager) {
+ this.extensionManager = extensionManager;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
index ef1810c..040329b 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
@@ -57,6 +57,7 @@
<property name="flowRegistryClient" ref="flowRegistryClient" />
<property name="extensionManager" ref="extensionManager" />
<property name="revisionManager" ref="revisionManager" />
+ <property name="statusHistoryRepository" ref="statusHistoryRepository"
/>
</bean>
<!-- flow service -->
@@ -77,4 +78,13 @@
<bean id="eventReporter"
class="org.apache.nifi.events.StandardEventReporter">
<constructor-arg ref="bulletinRepository" />
</bean>
+
+ <bean id="statusHistoryRepository"
class="org.apache.nifi.spring.StatusHistoryRepositoryFactoryBean">
+ <property name="nifiProperties" ref="nifiProperties"/>
+ <property name="extensionManager" ref="extensionManager" />
+ </bean>
+
+ <bean id="statusHistoryDumpFactory"
class="org.apache.nifi.controller.status.history.JsonNodeStatusHistoryDumpFactory">
+ <property name="statusHistoryRepository" ref="statusHistoryRepository"
/>
+ </bean>
</beans>
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
index 1870841..3abe468 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
@@ -24,6 +24,7 @@ import
org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.ScheduledStateLookup;
import org.apache.nifi.controller.serialization.StandardFlowSerializer;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
@@ -72,6 +73,7 @@ public class StandardFlowServiceTest {
private RevisionManager revisionManager;
private VariableRegistry variableRegistry;
private ExtensionManager extensionManager;
+ private StatusHistoryRepository statusHistoryRepository;
@BeforeClass
public static void setupSuite() {
@@ -91,8 +93,9 @@ public class StandardFlowServiceTest {
revisionManager = mock(RevisionManager.class);
extensionManager = mock(ExtensionDiscoveringManager.class);
flowController =
FlowController.createStandaloneInstance(mockFlowFileEventRepository,
properties, authorizer, mockAuditService, mockEncryptor,
- new VolatileBulletinRepository(),
variableRegistry, mock(FlowRegistryClient.class), extensionManager);
+ new VolatileBulletinRepository(),
variableRegistry, mock(FlowRegistryClient.class), extensionManager,
statusHistoryRepository);
flowService =
StandardFlowService.createStandaloneInstance(flowController, properties,
mockEncryptor, revisionManager, authorizer);
+ statusHistoryRepository = mock(StatusHistoryRepository.class);
}
@Test
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardProcessorNodeIT.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardProcessorNodeIT.java
index bc468ff3..832050a 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardProcessorNodeIT.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardProcessorNodeIT.java
@@ -33,6 +33,7 @@ import org.apache.nifi.controller.kerberos.KerberosConfig;
import
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.expression.ExpressionLanguageCompiler;
@@ -231,7 +232,7 @@ public class StandardProcessorNodeIT {
final FlowController flowController =
FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class),
nifiProperties,
mock(Authorizer.class), mock(AuditService.class), null, new
VolatileBulletinRepository(),
new
FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()),
- mock(FlowRegistryClient.class), extensionManager);
+ mock(FlowRegistryClient.class), extensionManager,
mock(StatusHistoryRepository.class));
// Init processor
final PropertyDescriptor classpathProp = new
PropertyDescriptor.Builder().name("Classpath Resources")
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
index db6051b..c0c86bd 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
@@ -44,6 +44,7 @@ import org.apache.nifi.controller.service.mock.DummyProcessor;
import org.apache.nifi.controller.service.mock.DummyReportingTask;
import org.apache.nifi.controller.service.mock.ServiceA;
import org.apache.nifi.controller.service.mock.ServiceB;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.encrypt.PropertyEncryptorFactory;
import org.apache.nifi.groups.ProcessGroup;
@@ -79,7 +80,6 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
import java.io.File;
import java.io.FileInputStream;
@@ -124,12 +124,13 @@ public class TestFlowController {
private BulletinRepository bulletinRepo;
private VariableRegistry variableRegistry;
private ExtensionDiscoveringManager extensionManager;
+ private StatusHistoryRepository statusHistoryRepository;
@Before
public void setup() {
- flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class);
- auditService = Mockito.mock(AuditService.class);
+ flowFileEventRepo = mock(FlowFileEventRepository.class);
+ auditService = mock(AuditService.class);
final Map<String, String> otherProps = new HashMap<>();
otherProps.put(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS,
MockProvenanceRepository.class.getName());
otherProps.put("nifi.remote.input.socket.port", "");
@@ -143,6 +144,8 @@ public class TestFlowController {
extensionManager = new StandardExtensionDiscoveringManager();
extensionManager.discoverExtensions(systemBundle,
Collections.emptySet());
+ statusHistoryRepository = mock(StatusHistoryRepository.class);
+
User user1 = new
User.Builder().identifier("user-id-1").identity("user-1").build();
User user2 = new
User.Builder().identifier("user-id-2").identity("user-2").build();
@@ -182,9 +185,9 @@ public class TestFlowController {
authorizer = new MockPolicyBasedAuthorizer(groups1, users1, policies1);
variableRegistry = new
FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths());
- bulletinRepo = Mockito.mock(BulletinRepository.class);
+ bulletinRepo = mock(BulletinRepository.class);
controller =
FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties,
authorizer,
- auditService, encryptor, bulletinRepo, variableRegistry,
Mockito.mock(FlowRegistryClient.class), extensionManager);
+ auditService, encryptor, bulletinRepo, variableRegistry,
mock(FlowRegistryClient.class), extensionManager, statusHistoryRepository);
}
@After
@@ -205,7 +208,7 @@ public class TestFlowController {
final DataFlow proposedDataFlow = new
StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null,
authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
mock(FlowService.class));
// should be two controller services
final Set<ControllerServiceNode> controllerServiceNodes =
controller.getFlowManager().getAllControllerServices();
@@ -267,7 +270,7 @@ public class TestFlowController {
final DataFlow proposedDataFlow = new
StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null,
authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
mock(FlowService.class));
try {
// should be two controller services
@@ -287,10 +290,10 @@ public class TestFlowController {
// verify the processor is still pointing at the controller
service that got moved to the root group
final ProcessorNode processorNode =
processorNodes.stream().findFirst().get();
final PropertyDescriptor procControllerServiceProp =
processorNode.getRawPropertyValues().entrySet().stream()
- .filter(e -> e.getValue().equals(rootGroupCs.getIdentifier()))
- .map(Map.Entry::getKey)
- .findFirst()
- .get();
+ .filter(e ->
e.getValue().equals(rootGroupCs.getIdentifier()))
+ .map(Map.Entry::getKey)
+ .findFirst()
+ .get();
assertNotNull(procControllerServiceProp);
} finally {
purgeFlow();
@@ -304,10 +307,10 @@ public class TestFlowController {
// create a mock proposed data flow with the same auth fingerprint as
the current authorizer
final String authFingerprint = authorizer.getFingerprint();
- final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+ final DataFlow proposedDataFlow = mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
mock(FlowService.class));
assertEquals(authFingerprint, authorizer.getFingerprint());
}
@@ -323,11 +326,11 @@ public class TestFlowController {
final String authFingerprint = "<authorizations></authorizations>";
final DataFlow proposedDataFlow = new
StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null,
authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
mock(FlowService.class));
controller.initializeFlow();
try {
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
mock(FlowService.class));
assertNotEquals(authFingerprint, authorizer.getFingerprint());
} finally {
purgeFlow();
@@ -346,7 +349,7 @@ public class TestFlowController {
final DataFlow proposedDataFlow = new
StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null,
authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
try {
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
mock(FlowService.class));
controller.initializeFlow();
} finally {
purgeFlow();
@@ -365,7 +368,7 @@ public class TestFlowController {
final DataFlow proposedDataFlow = new
StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null,
authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
try {
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
mock(FlowService.class));
controller.initializeFlow();
ParameterContext parameterContext =
controller.getFlowManager().getParameterContextManager().getParameterContext("context");
@@ -390,7 +393,7 @@ public class TestFlowController {
final DataFlow proposedDataFlow = new
StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null,
authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
try {
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
mock(FlowService.class));
controller.initializeFlow();
final Map<String, Parameter> parameters = new HashMap<>();
@@ -436,10 +439,10 @@ public class TestFlowController {
// create a mock proposed data flow with different auth fingerprint as
the current authorizer
final String authFingerprint = "<authorizations></authorizations>";
- final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+ final DataFlow proposedDataFlow = mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
mock(FlowService.class));
assertNotEquals(authFingerprint, authorizer.getFingerprint());
assertTrue(authorizer.getGroups().isEmpty());
@@ -457,14 +460,14 @@ public class TestFlowController {
final String authFingerprint = "<authorizations></authorizations>";
final DataFlow proposedDataFlow = new
StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null,
authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
mock(FlowService.class));
controller.initializeFlow();
final DataFlow dataflowWithNullAuthorizations = new
StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null, null,
Collections.emptySet());
try {
- controller.synchronize(standardFlowSynchronizer,
dataflowWithNullAuthorizations, Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer,
dataflowWithNullAuthorizations, mock(FlowService.class));
Assert.fail("Was able to synchronize controller with null
authorizations but dataflow wasn't empty");
} catch (final UninheritableFlowException expected) {
// expected
@@ -478,10 +481,10 @@ public class TestFlowController {
final FlowSynchronizer standardFlowSynchronizer = new
StandardFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties),
nifiProperties, extensionManager);
- final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+ final DataFlow proposedDataFlow = mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(null);
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
mock(FlowService.class));
assertTrue(authorizer.getGroups().isEmpty());
assertTrue(authorizer.getUsers().isEmpty());
@@ -512,7 +515,7 @@ public class TestFlowController {
// create a mock proposed data flow with the same auth fingerprint as
the current authorizer
final String authFingerprint = authorizer.getFingerprint();
- final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+ final DataFlow proposedDataFlow = mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
authorizer = new MockPolicyBasedAuthorizer();
@@ -520,8 +523,8 @@ public class TestFlowController {
controller.shutdown(true);
controller =
FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties,
authorizer,
- auditService, encryptor, bulletinRepo, variableRegistry,
Mockito.mock(FlowRegistryClient.class), extensionManager);
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
Mockito.mock(FlowService.class));
+ auditService, encryptor, bulletinRepo, variableRegistry,
mock(FlowRegistryClient.class), extensionManager, statusHistoryRepository);
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
mock(FlowService.class));
assertEquals(authFingerprint, authorizer.getFingerprint());
}
@@ -534,11 +537,11 @@ public class TestFlowController {
missingComponents.add("1");
missingComponents.add("2");
- final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+ final DataFlow proposedDataFlow = mock(DataFlow.class);
when(proposedDataFlow.getMissingComponents()).thenReturn(missingComponents);
try {
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow,
mock(FlowService.class));
Assert.fail("Should have thrown exception");
} catch (UninheritableFlowException e) {
assertTrue(e.getMessage().contains("Proposed flow has missing
components that are not considered missing in the current flow (1,2)"));
@@ -568,7 +571,7 @@ public class TestFlowController {
final SnippetManager mockSnippetManager = mock(SnippetManager.class);
when(mockSnippetManager.export()).thenReturn(new byte[0]);
- final FlowManager flowManager = Mockito.mock(FlowManager.class);
+ final FlowManager flowManager = mock(FlowManager.class);
final FlowController mockFlowController = mock(FlowController.class);
when(mockFlowController.getFlowManager()).thenReturn(flowManager);
@@ -579,11 +582,11 @@ public class TestFlowController {
when(mockFlowController.getAuthorizer()).thenReturn(authorizer);
when(mockFlowController.getSnippetManager()).thenReturn(mockSnippetManager);
- final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+ final DataFlow proposedDataFlow = mock(DataFlow.class);
when(proposedDataFlow.getMissingComponents()).thenReturn(new
HashSet<>());
try {
- standardFlowSynchronizer.sync(mockFlowController,
proposedDataFlow, encryptor, Mockito.mock(FlowService.class));
+ standardFlowSynchronizer.sync(mockFlowController,
proposedDataFlow, encryptor, mock(FlowService.class));
Assert.fail("Should have thrown exception");
} catch (UninheritableFlowException e) {
assertTrue(e.getMessage(), e.getMessage().contains("Current flow
has missing components that are not considered missing in the proposed flow
(1,2,3)"));
@@ -636,7 +639,7 @@ public class TestFlowController {
final byte[] authFingerprintBytes =
authFingerprint.getBytes(StandardCharsets.UTF_8);
final DataFlow proposedDataFlow1 = new StandardDataFlow(flowBytes,
null, authFingerprintBytes, Collections.emptySet());
- controller.synchronize(standardFlowSynchronizer, proposedDataFlow1,
Mockito.mock(FlowService.class));
+ controller.synchronize(standardFlowSynchronizer, proposedDataFlow1,
mock(FlowService.class));
}
@Test
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
index 11a0ad5..a14a730 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
@@ -31,6 +31,7 @@ import org.apache.nifi.controller.DummyScheduledReportingTask;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.encrypt.PropertyEncryptorFactory;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
@@ -69,6 +70,7 @@ public class TestStandardReportingContext {
private BulletinRepository bulletinRepo;
private VariableRegistry variableRegistry;
private FlowRegistryClient flowRegistry;
+ private StatusHistoryRepository statusHistoryRepository;
private volatile String propsFile =
TestStandardReportingContext.class.getResource("/flowcontrollertest.nifi.properties").getFile();
@Before
@@ -87,6 +89,8 @@ public class TestStandardReportingContext {
extensionManager = new StandardExtensionDiscoveringManager();
extensionManager.discoverExtensions(systemBundle,
Collections.emptySet());
+ statusHistoryRepository = Mockito.mock(StatusHistoryRepository.class);
+
User user1 = new
User.Builder().identifier("user-id-1").identity("user-1").build();
User user2 = new
User.Builder().identifier("user-id-2").identity("user-2").build();
@@ -129,7 +133,7 @@ public class TestStandardReportingContext {
bulletinRepo = Mockito.mock(BulletinRepository.class);
controller =
FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties,
authorizer, auditService, encryptor,
- bulletinRepo, variableRegistry, flowRegistry,
extensionManager);
+ bulletinRepo, variableRegistry, flowRegistry,
extensionManager, statusHistoryRepository);
}
@After
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
index 1c7f187..b2f38fe 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
@@ -35,6 +35,7 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
@@ -558,9 +559,9 @@ public class ProcessorLifecycleIT {
extensionManager.discoverExtensions(systemBundle,
Collections.emptySet());
final FlowController flowController =
FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class),
nifiProperties,
- mock(Authorizer.class), mock(AuditService.class), null, new
VolatileBulletinRepository(),
- new
FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()),
- mock(FlowRegistryClient.class), extensionManager);
+ mock(Authorizer.class), mock(AuditService.class), null, new
VolatileBulletinRepository(),
+ new
FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()),
+ mock(FlowRegistryClient.class), extensionManager,
mock(StatusHistoryRepository.class));
final FlowManager flowManager = flowController.getFlowManager();
this.processScheduler = flowController.getProcessScheduler();
@@ -593,9 +594,11 @@ public class ProcessorLifecycleIT {
}
/**
+ *
*/
public static class TestProcessor extends AbstractProcessor {
- private static final Runnable NOP = () -> {};
+ private static final Runnable NOP = () -> {
+ };
private Runnable onScheduleCallback = NOP;
private Runnable onUnscheduleCallback = NOP;
@@ -613,8 +616,7 @@ public class ProcessorLifecycleIT {
private final List<String> operationNames = new LinkedList<>();
- void setScenario(Runnable onScheduleCallback, Runnable
onUnscheduleCallback, Runnable onStopCallback,
- Runnable onTriggerCallback) {
+ void setScenario(Runnable onScheduleCallback, Runnable
onUnscheduleCallback, Runnable onStopCallback, Runnable onTriggerCallback) {
this.onScheduleCallback = onScheduleCallback;
this.onUnscheduleCallback = onUnscheduleCallback;
this.onStopCallback = onStopCallback;
@@ -731,6 +733,7 @@ public class ProcessorLifecycleIT {
this.delayLimit = delayLimit;
this.randomDelay = randomDelay;
}
+
Random random = new Random();
@Override
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
index 4129f12..377778d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
@@ -25,6 +25,7 @@ import org.apache.nifi.controller.DummyScheduledProcessor;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.encrypt.PropertyEncryptorFactory;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
@@ -100,7 +101,7 @@ public class StandardFlowSerializerTest {
final BulletinRepository bulletinRepo =
Mockito.mock(BulletinRepository.class);
controller =
FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties,
authorizer,
- auditService, encryptor, bulletinRepo, variableRegistry,
Mockito.mock(FlowRegistryClient.class), extensionManager);
+ auditService, encryptor, bulletinRepo, variableRegistry,
Mockito.mock(FlowRegistryClient.class), extensionManager,
Mockito.mock(StatusHistoryRepository.class));
serializer = new StandardFlowSerializer(encryptor);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDumpFactoryTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDumpFactoryTest.java
new file mode 100644
index 0000000..fc331bf
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/JsonNodeStatusHistoryDumpFactoryTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.Date;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class JsonNodeStatusHistoryDumpFactoryTest {
+
+ private static final String EXPECTED_EXCEPTION_MESSAGE = "The number of
days shall be greater than 0. The current value is %s.";
+
+ @Test
+ public void testJsonNodeStatusDumpFactory() {
+ final int days = 3;
+ final StatusHistoryRepository statusHistoryRepository =
mock(StatusHistoryRepository.class);
+ final ArgumentCaptor<Date> fromArgumentCaptor =
ArgumentCaptor.forClass(Date.class);
+ final ArgumentCaptor<Date> toArgumentCaptor =
ArgumentCaptor.forClass(Date.class);
+
+ JsonNodeStatusHistoryDumpFactory factory = new
JsonNodeStatusHistoryDumpFactory();
+ factory.setStatusHistoryRepository(statusHistoryRepository);
+
+ factory.create(days);
+
+
verify(statusHistoryRepository).getNodeStatusHistory(fromArgumentCaptor.capture(),
toArgumentCaptor.capture());
+
+ final LocalDateTime endOfToday =
LocalDateTime.now().with(LocalTime.MAX);
+ final LocalDateTime startOfDaysBefore =
endOfToday.minusDays(days).with(LocalTime.MIN);
+
+ final Date endOfTodayDate =
Date.from(endOfToday.atZone(ZoneId.systemDefault()).toInstant());
+ final Date startOfDaysBeforeDate =
Date.from(startOfDaysBefore.atZone(ZoneId.systemDefault()).toInstant());
+
+ assertEquals(endOfTodayDate, toArgumentCaptor.getValue());
+ assertEquals(startOfDaysBeforeDate, fromArgumentCaptor.getValue());
+ }
+
+ @Test
+ public void
testJsonNodeStatusDumpFactoryWithLessThanOneDayThrowsException() {
+ final int zeroDays = 0;
+ final int negativeDays = -1;
+ final StatusHistoryRepository statusHistoryRepository =
mock(StatusHistoryRepository.class);
+
+ JsonNodeStatusHistoryDumpFactory factory = new
JsonNodeStatusHistoryDumpFactory();
+ factory.setStatusHistoryRepository(statusHistoryRepository);
+
+ final IllegalArgumentException zeroDaysException =
Assert.assertThrows(IllegalArgumentException.class,
+ () -> factory.create(zeroDays)
+ );
+
+ assertEquals(String.format(EXPECTED_EXCEPTION_MESSAGE, zeroDays),
zeroDaysException.getMessage());
+
+ final IllegalArgumentException negativeDaysException =
Assert.assertThrows(IllegalArgumentException.class,
+ () -> factory.create(negativeDays)
+ );
+
+ assertEquals(String.format(EXPECTED_EXCEPTION_MESSAGE, negativeDays),
negativeDaysException.getMessage());
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
index f5505b6..3f4e7d6 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
@@ -157,6 +157,7 @@ public class FrameworkIntegrationTest {
private Bundle systemBundle;
private ClusterCoordinator clusterCoordinator;
private NiFiProperties nifiProperties;
+ private StatusHistoryRepository statusHistoryRepository;
public static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success").build();
@@ -231,6 +232,8 @@ public class FrameworkIntegrationTest {
systemBundle = SystemBundle.create(nifiProperties);
extensionManager.discoverExtensions(systemBundle,
Collections.emptySet());
+ statusHistoryRepository = Mockito.mock(StatusHistoryRepository.class);
+
final PropertyEncryptor encryptor = createEncryptor();
final Authorizer authorizer = new AlwaysAuthorizedAuthorizer();
final AuditService auditService = new NopAuditService();
@@ -266,8 +269,9 @@ public class FrameworkIntegrationTest {
Mockito.when(clusterCoordinator.getNodeIdentifiers()).thenReturn(nodeIdentifiers);
Mockito.when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(localNodeId);
- flowController =
FlowController.createClusteredInstance(flowFileEventRepository, nifiProperties,
authorizer, auditService, encryptor, protocolSender, bulletinRepo,
clusterCoordinator,
- heartbeatMonitor, leaderElectionManager,
VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY, flowRegistryClient,
extensionManager, Mockito.mock(RevisionManager.class));
+ flowController =
FlowController.createClusteredInstance(flowFileEventRepository, nifiProperties,
authorizer, auditService, encryptor, protocolSender,
+ bulletinRepo, clusterCoordinator, heartbeatMonitor,
leaderElectionManager, VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY,
flowRegistryClient,
+ extensionManager, Mockito.mock(RevisionManager.class),
statusHistoryRepository);
flowController.setClustered(true, UUID.randomUUID().toString());
flowController.setNodeId(localNodeId);
@@ -275,7 +279,7 @@ public class FrameworkIntegrationTest {
flowController.setConnectionStatus(new
NodeConnectionStatus(localNodeId, NodeConnectionState.CONNECTED));
} else {
flowController =
FlowController.createStandaloneInstance(flowFileEventRepository,
nifiProperties, authorizer, auditService, encryptor, bulletinRepo,
- VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY,
flowRegistryClient, extensionManager);
+ VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY,
flowRegistryClient, extensionManager, statusHistoryRepository);
}
processScheduler = new StandardProcessScheduler(flowEngine,
flowController, encryptor, flowController.getStateManagerProvider(),
nifiProperties);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
index c2b5143..1118995 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
@@ -34,6 +34,8 @@ import org.apache.nifi.controller.StandardFlowService;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
+import org.apache.nifi.controller.status.history.StatusHistoryDumpFactory;
+import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.diagnostics.DiagnosticsDump;
import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
@@ -51,6 +53,7 @@ import
org.apache.nifi.registry.flow.StandardFlowRegistryClient;
import org.apache.nifi.registry.variable.FileBasedVariableRegistry;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.services.FlowService;
+import org.apache.nifi.spring.StatusHistoryRepositoryFactoryBean;
import org.apache.nifi.util.FlowParser;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
@@ -64,6 +67,7 @@ import java.util.List;
import java.util.Set;
/**
+ *
*/
public class HeadlessNiFiServer implements NiFiServer {
@@ -128,6 +132,11 @@ public class HeadlessNiFiServer implements NiFiServer {
StandardFlowRegistryClient flowRegistryClient = new
StandardFlowRegistryClient();
flowRegistryClient.setProperties(props);
+ final StatusHistoryRepositoryFactoryBean
statusHistoryRepositoryFactoryBean = new StatusHistoryRepositoryFactoryBean();
+ statusHistoryRepositoryFactoryBean.setNifiProperties(props);
+
statusHistoryRepositoryFactoryBean.setExtensionManager(extensionManager);
+ StatusHistoryRepository statusHistoryRepository =
statusHistoryRepositoryFactoryBean.getObject();
+
flowController = FlowController.createStandaloneInstance(
flowFileEventRepository,
props,
@@ -137,7 +146,8 @@ public class HeadlessNiFiServer implements NiFiServer {
bulletinRepository,
variableRegistry,
flowRegistryClient,
- extensionManager);
+ extensionManager,
+ statusHistoryRepository);
flowService = StandardFlowService.createStandaloneInstance(
flowController,
@@ -199,6 +209,11 @@ public class HeadlessNiFiServer implements NiFiServer {
return null;
}
+ @Override
+ public StatusHistoryDumpFactory getStatusHistoryDumpFactory() {
+ return null;
+ }
+
public void stop() {
try {
flowService.stop(false);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
index 6d36bdf..f70e896 100755
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
@@ -455,7 +455,7 @@ case "$1" in
install "$@"
;;
-
start|stop|decommission|run|status|is_loaded|dump|diagnostics|env|stateless|set-sensitive-properties-key|set-single-user-credentials)
+
start|stop|decommission|run|status|is_loaded|dump|diagnostics|status-history|env|stateless|set-sensitive-properties-key|set-single-user-credentials)
main "$@"
;;
@@ -465,6 +465,6 @@ case "$1" in
run "start"
;;
*)
- echo "Usage nifi
{start|stop|decommission|run|restart|status|dump|diagnostics|install|stateless|set-sensitive-properties-key|set-single-user-credentials}"
+ echo "Usage nifi
{start|stop|decommission|run|restart|status|dump|diagnostics|status-history|install|stateless|set-sensitive-properties-key|set-single-user-credentials}"
;;
esac
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
index b9b12ec..574ea29 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
@@ -17,6 +17,7 @@
package org.apache.nifi;
import org.apache.nifi.controller.DecommissionTask;
+import org.apache.nifi.controller.status.history.StatusHistoryDump;
import org.apache.nifi.diagnostics.DiagnosticsDump;
import org.apache.nifi.util.LimitingInputStream;
import org.slf4j.Logger;
@@ -72,7 +73,7 @@ public class BootstrapListener {
listenThread.start();
logger.debug("Notifying Bootstrap that local port is {}", localPort);
- sendCommand("PORT", new String[] { String.valueOf(localPort),
secretKey});
+ sendCommand("PORT", new String[]{String.valueOf(localPort),
secretKey});
}
public void reload() throws IOException {
@@ -94,7 +95,7 @@ public class BootstrapListener {
public void sendStartedStatus(boolean status) throws IOException {
logger.debug("Notifying Bootstrap that the status of starting NiFi is
{}", status);
- sendCommand("STARTED", new String[]{ String.valueOf(status) });
+ sendCommand("STARTED", new String[]{String.valueOf(status)});
}
private void sendCommand(final String command, final String[] args) throws
IOException {
@@ -238,6 +239,12 @@ public class BootstrapListener {
writeDiagnostics(socket.getOutputStream(), verbose);
break;
+ case STATUS_HISTORY:
+ logger.info("Received STATUS_HISTORY
request from Bootstrap");
+ final String[] statusHistoryArgs =
request.getArgs();
+ final int days =
Integer.parseInt(statusHistoryArgs[0]);
+
writeNodeStatusHistory(socket.getOutputStream(), days);
+ break;
case IS_LOADED:
logger.debug("Received IS_LOADED
request from Bootstrap");
String answer =
String.valueOf(nifiLoaded);
@@ -282,6 +289,11 @@ public class BootstrapListener {
diagnosticsDump.writeTo(out);
}
+ private void writeNodeStatusHistory(final OutputStream out, final int
days) throws IOException {
+ final StatusHistoryDump statusHistoryDump =
nifi.getServer().getStatusHistoryDumpFactory().create(days);
+ statusHistoryDump.writeTo(out);
+ }
+
private void sendAnswer(final OutputStream out, final String answer)
throws IOException {
out.write((answer + "\n").getBytes(StandardCharsets.UTF_8));
out.flush();
@@ -333,7 +345,8 @@ public class BootstrapListener {
DIAGNOSTICS,
DECOMMISSION,
PING,
- IS_LOADED
+ IS_LOADED,
+ STATUS_HISTORY
}
private final RequestType requestType;
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
index c70f6b9..72efa0d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
@@ -27,6 +27,7 @@ import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
+import org.apache.nifi.controller.status.history.StatusHistoryDumpFactory;
import org.apache.nifi.diagnostics.DiagnosticsDump;
import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
@@ -165,6 +166,7 @@ public class JettyServer implements NiFiServer,
ExtensionUiLoader {
private DiagnosticsFactory diagnosticsFactory;
private SslContextFactory.Server sslContextFactory;
private DecommissionTask decommissionTask;
+ private StatusHistoryDumpFactory statusHistoryDumpFactory;
private WebAppContext webApiContext;
private WebAppContext webDocsContext;
@@ -631,7 +633,7 @@ public class JettyServer implements NiFiServer,
ExtensionUiLoader {
XSSProtectionFilter.class,
XContentTypeOptionsFilter.class));
- if(props.isHTTPSConfigured()) {
+ if (props.isHTTPSConfigured()) {
filters.add(StrictTransportSecurityFilter.class);
}
filters.forEach((filter) -> addFilters(filter, webappContext));
@@ -708,7 +710,7 @@ public class JettyServer implements NiFiServer,
ExtensionUiLoader {
* @param webAppContext context to which filters will be added
* @param props the {@link NiFiProperties}
*/
- private static void addDenialOfServiceFilters(final WebAppContext
webAppContext, final NiFiProperties props) {
+ private static void addDenialOfServiceFilters(final WebAppContext
webAppContext, final NiFiProperties props) {
addWebRequestLimitingFilter(webAppContext,
props.getMaxWebRequestsPerSecond(), getWebRequestTimeoutMs(props),
props.getWebRequestIpWhitelist());
// Only add the ContentLengthFilter if the property is explicitly set
(empty by default)
@@ -737,10 +739,10 @@ public class JettyServer implements NiFiServer,
ExtensionUiLoader {
* In order to allow clients to make more requests than the maximum rate,
clients can be added to the {@code ipWhitelist}.
* The {@code requestTimeoutInMilliseconds} value limits requests to the
given request timeout amount, and will close connections that run longer than
this time.
*
- * @param webAppContext Web Application Context where Filter will be added
+ * @param webAppContext Web Application Context where Filter will be
added
* @param maxRequestsPerSec Maximum number of allowed requests per second
- * @param maxRequestMs Maximum amount of time in milliseconds before a
connection will be automatically closed
- * @param allowed Comma-separated string of IP addresses that should not
be rate limited. Does not apply to request timeout
+ * @param maxRequestMs Maximum amount of time in milliseconds before
a connection will be automatically closed
+ * @param allowed Comma-separated string of IP addresses that
should not be rate limited. Does not apply to request timeout
*/
private static void addWebRequestLimitingFilter(final WebAppContext
webAppContext, final int maxRequestsPerSec, final long maxRequestMs, final
String allowed) {
final FilterHolder holder = new FilterHolder(DoSFilter.class);
@@ -905,6 +907,7 @@ public class JettyServer implements NiFiServer,
ExtensionUiLoader {
/**
* Configures a KeyStoreScanner and TrustStoreScanner at the configured
reload intervals. This will
* reload the SSLContextFactory if any changes are detected to the
keystore or truststore.
+ *
* @param server The Jetty server
*/
private void configureSslContextFactoryReloading(Server server) {
@@ -1185,6 +1188,7 @@ public class JettyServer implements NiFiServer,
ExtensionUiLoader {
diagnosticsFactory =
webApplicationContext.getBean("diagnosticsFactory", DiagnosticsFactory.class);
decommissionTask =
webApplicationContext.getBean("decommissionTask", DecommissionTask.class);
+ statusHistoryDumpFactory =
webApplicationContext.getBean("statusHistoryDumpFactory",
StatusHistoryDumpFactory.class);
}
// ensure the web document war was loaded and provide the
extension mapping
@@ -1263,6 +1267,12 @@ public class JettyServer implements NiFiServer,
ExtensionUiLoader {
return decommissionTask;
}
+ @Override
+ public StatusHistoryDumpFactory getStatusHistoryDumpFactory() {
+ return statusHistoryDumpFactory;
+ }
+
+
private void performInjectionForComponentUis(final
Collection<WebAppContext> componentUiExtensionWebContexts,
final
NiFiWebConfigurationContext configurationContext, final FilterHolder
securityFilter) {
if (CollectionUtils.isNotEmpty(componentUiExtensionWebContexts)) {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 75b1f0a..27e75be 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -620,6 +620,7 @@
<exclude>src/test/resources/TestLookupRecord/lookup-array-output.json</exclude>
<exclude>src/test/resources/TestValidateRecord/int-maps-schema.avsc</exclude>
<exclude>src/test/resources/TestValidateRecord/int-maps-data.json</exclude>
+
<exclude>src/test/resources/TestValidateRecord/array-and-map-with-null-element.avro</exclude>
</excludes>
</configuration>
</plugin>
diff --git a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
b/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
index cc87079..70a0f5e 100644
--- a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
+++ b/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
@@ -18,6 +18,7 @@ package org.apache.nifi;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.controller.DecommissionTask;
+import org.apache.nifi.controller.status.history.StatusHistoryDumpFactory;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
import org.apache.nifi.nar.ExtensionMapping;
import org.apache.nifi.util.NiFiProperties;
@@ -40,4 +41,6 @@ public interface NiFiServer {
DiagnosticsFactory getThreadDumpFactory();
DecommissionTask getDecommissionTask();
+
+ StatusHistoryDumpFactory getStatusHistoryDumpFactory();
}