This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 8e1452a NIFI-6649 - added separate query interval configuration for
observation queries NIFI-6649 - documentation update
8e1452a is described below
commit 8e1452a3f342ee45dfd589d343c9ecf6a7cf6825
Author: Yolanda M. Davis <[email protected]>
AuthorDate: Tue Sep 10 15:54:42 2019 -0400
NIFI-6649 - added separate query interval configuration for observation
queries
NIFI-6649 - documentation update
NIFI-6649 - add debug logging for score and prediction information
NIFI-6649 - fix to ensure counts return minimum value of 0 if not infinite
or NaN
Signed-off-by: Matthew Burgess <[email protected]>
This closes #3719
---
.../java/org/apache/nifi/util/NiFiProperties.java | 2 ++
.../src/main/asciidoc/administration-guide.adoc | 3 ++-
.../org/apache/nifi/controller/FlowController.java | 13 ++++++++++-
.../CachingConnectionStatusAnalyticsEngine.java | 5 ++--
.../analytics/ConnectionStatusAnalytics.java | 27 ++++++++++++++++++++--
.../analytics/ConnectionStatusAnalyticsEngine.java | 6 ++++-
...TestCachingConnectionStatusAnalyticsEngine.java | 8 ++++---
.../TestConnectionStatusAnalyticsEngine.java | 5 ++--
.../analytics/TestStatusAnalyticsEngine.java | 5 ++--
.../nifi-framework/nifi-resources/pom.xml | 1 +
.../src/main/resources/conf/nifi.properties | 1 +
11 files changed, 62 insertions(+), 14 deletions(-)
diff --git
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 7da514c..afcd268 100644
---
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -241,6 +241,7 @@ public abstract class NiFiProperties {
// analytics properties
public static final String ANALYTICS_PREDICTION_ENABLED =
"nifi.analytics.predict.enabled";
public static final String ANALYTICS_PREDICTION_INTERVAL =
"nifi.analytics.predict.interval";
+ public static final String ANALYTICS_QUERY_INTERVAL =
"nifi.analytics.query.interval";
public static final String ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION =
"nifi.analytics.connection.model.implementation";
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME=
"nifi.analytics.connection.model.score.name";
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD =
"nifi.analytics.connection.model.score.threshold";
@@ -318,6 +319,7 @@ public abstract class NiFiProperties {
// analytics defaults
public static final String DEFAULT_ANALYTICS_PREDICTION_ENABLED = "false";
public static final String DEFAULT_ANALYTICS_PREDICTION_INTERVAL = "3
mins";
+ public static final String DEFAULT_ANALYTICS_QUERY_INTERVAL = "3 mins";
public final static String
DEFAULT_ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION =
"org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares";
public static final String DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME =
"rSquared";
public static final double DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD =
.90;
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 432ca9e..8c5cac9 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -2389,7 +2389,7 @@ In order to generate predictions, local status snapshot
history is queried to ob
NiFi evaluates the model's effectiveness before sending prediction information
by using the model's R-Squared score by default. One important note: R-Square
is a measure of how close the regression line fits the observation data vs. how
accurate the prediction will be; therefore there may be some measure of error.
If the R-Squared score for the calculated model meets the configured threshold
(as defined by `nifi.analytics.connection.model.score.threshold`) then the
model will be used for [...]
-The prediction interval `nifi.analytics.predict.interval` can be configured to
project out further when back pressure will occur. Predictions further out in
time require more observations stored locally to generate an effective model.
This may also require tuning of the model's scoring threshold value to select a
score which can offer reasonable predictions.
+The prediction interval `nifi.analytics.predict.interval` can be configured to
project out further when back pressure will occur. The prediction query
interval `nifi.analytics.query.interval` can also be configured to determine
how far back in time past observations should be queried in order to generate
the model. Adjustments to these settings may require tuning of the model's
scoring threshold value to select a score that can offer reasonable predictions.
See <<analytics_properties>> for complete information on configuring analytic
properties.
@@ -3341,6 +3341,7 @@ These properties determine the behavior of the internal
NiFi predictive analytic
|*Property*|*Description*
|`nifi.analytics.predict.enabled`|This indicates whether prediction should be
enabled for the cluster. The default is `false`.
|`nifi.analytics.predict.interval`|This indicates a time interval for which
analytical predictions (queue saturation, e.g.) should be made. The default
value is `3 mins`.
+|`nifi.analytics.query.interval`|This indicates a time interval to query for
past observations (e.g. the last 3 minutes of snapshots). The default value is
`3 mins`. NOTE: This value should be at least 3 times greater than
`nifi.components.status.snapshot.frequency` to ensure enough observations are
retrieved for predictions.
|`nifi.analytics.connection.model.implementation`|This is the implementation
class for the status analytics model used to make connection predictions. The
default value is
`org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares`.
|`nifi.analytics.connection.model.score.name`|This is the name of the scoring
type that should be used to evaluate model. The default value is `rSquared`.
|`nifi.analytics.connection.model.score.threshold`|This is the threshold for
the scoring value (where model score should be above given threshold). The
default value is `.9`.
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 98b4395..c6f942b 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -614,6 +614,17 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
predictionIntervalMillis =
FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL,
TimeUnit.MILLISECONDS);
}
+ // Determine interval for querying past observations
+ final String queryInterval =
nifiProperties.getProperty(NiFiProperties.ANALYTICS_QUERY_INTERVAL,
NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL);
+ long queryIntervalMillis;
+ try {
+ queryIntervalMillis =
FormatUtils.getTimeDuration(queryInterval, TimeUnit.MILLISECONDS);
+ } catch (final Exception e) {
+ LOG.warn("Analytics is enabled however could not retrieve
value for "+ NiFiProperties.ANALYTICS_QUERY_INTERVAL + ". This property has
been set to '"
+ + NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL +
"'");
+ queryIntervalMillis =
FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL,
TimeUnit.MILLISECONDS);
+ }
+
// Determine score name to use for evaluating model performance
String modelScoreName =
nifiProperties.getProperty(NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_NAME,
NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME);
@@ -632,7 +643,7 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
.getConnectionStatusModelMap(extensionManager,
nifiProperties);
analyticsEngine = new
CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository,
flowFileEventRepository, modelMap,
- predictionIntervalMillis, modelScoreName,
modelScoreThreshold);
+ predictionIntervalMillis, queryIntervalMillis,
modelScoreName, modelScoreThreshold);
}
eventAccess = new StandardEventAccess(this, flowFileEventRepository);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
index 3588ed5..aa67811 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
@@ -40,9 +40,10 @@ public class CachingConnectionStatusAnalyticsEngine extends
ConnectionStatusAnal
public CachingConnectionStatusAnalyticsEngine(FlowManager flowManager,
ComponentStatusRepository statusRepository,
FlowFileEventRepository flowFileEventRepository, Map<String,
Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap,
- long predictionIntervalMillis, String scoreName, double
scoreThreshold) {
+ long predictionIntervalMillis, long queryIntervalMillis, String
scoreName, double scoreThreshold) {
-
super(flowManager,statusRepository,flowFileEventRepository,modelMap,predictionIntervalMillis,scoreName,scoreThreshold);
+ super(flowManager, statusRepository, flowFileEventRepository,
modelMap, predictionIntervalMillis,
+ queryIntervalMillis, scoreName, scoreThreshold);
this.cache = Caffeine.newBuilder()
.expireAfterWrite(30, TimeUnit.MINUTES)
.build();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index 4bf1948..7abba4b 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -24,6 +24,7 @@ import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.stream.Stream;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.flow.FlowManager;
@@ -56,6 +57,7 @@ public class ConnectionStatusAnalytics implements
StatusAnalytics {
private final Boolean supportOnlineLearning;
private Boolean extendWindow = false;
private long intervalMillis = 3L * 60 * 1000; // Default is 3 minutes
+ private long queryIntervalMillis = 3L * 60 * 1000; //Default is 3 minutes
private String scoreName = "rSquared";
private double scoreThreshold = .90;
@@ -78,7 +80,7 @@ public class ConnectionStatusAnalytics implements
StatusAnalytics {
//Obtain latest observations when available, extend window if
needed to obtain minimum observations
this.queryWindow = new QueryWindow(extendWindow ?
queryWindow.getStartTimeMillis() : queryWindow.getEndTimeMillis(),
System.currentTimeMillis());
} else {
- this.queryWindow = new QueryWindow(System.currentTimeMillis() -
getIntervalTimeMillis(), System.currentTimeMillis());
+ this.queryWindow = new QueryWindow(System.currentTimeMillis() -
getQueryIntervalMillis(), System.currentTimeMillis());
}
modelMap.forEach((metric, modelFunction) -> {
@@ -94,6 +96,13 @@ public class ConnectionStatusAnalytics implements
StatusAnalytics {
try {
LOG.debug("Refreshing model with new data for connection
id: {} ", connectionIdentifier);
model.learn(Stream.of(features), Stream.of(values));
+
+ if(MapUtils.isNotEmpty(model.getScores())){
+ model.getScores().forEach((key, value) -> {
+ LOG.debug("Model Scores for prediction metric {}
for connection id {}: {}={} ", metric, connectionIdentifier, key, value);
+ });
+ }
+
extendWindow = false;
} catch (Exception ex) {
LOG.debug("Exception encountered while training model for
connection id {}: {}", connectionIdentifier, ex.getMessage());
@@ -137,6 +146,7 @@ public class ConnectionStatusAnalytics implements
StatusAnalytics {
predictFeatures.put(1, inOutRatio);
return convertTimePrediction(bytesModel.predictVariable(0,
predictFeatures, backPressureBytes), System.currentTimeMillis());
} else {
+ LOG.debug("Model is not valid for calculating time back pressure
by content size in bytes. Returning -1");
return -1L;
}
}
@@ -164,6 +174,7 @@ public class ConnectionStatusAnalytics implements
StatusAnalytics {
predictFeatures.put(1, inOutRatio);
return convertTimePrediction(countModel.predictVariable(0,
predictFeatures, backPressureCountThreshold), System.currentTimeMillis());
} else {
+ LOG.debug("Model is not valid for calculating time to back
pressure by object count. Returning -1");
return -1L;
}
}
@@ -186,6 +197,7 @@ public class ConnectionStatusAnalytics implements
StatusAnalytics {
predictFeatures.add(inOutRatio);
return
convertCountPrediction(bytesModel.predict(predictFeatures.toArray(new
Double[2])));
} else {
+ LOG.debug("Model is not valid for predicting content size in bytes
for next interval. Returning -1");
return -1L;
}
}
@@ -208,6 +220,7 @@ public class ConnectionStatusAnalytics implements
StatusAnalytics {
predictFeatures.add(inOutRatio);
return
convertCountPrediction(countModel.predict(predictFeatures.toArray(new
Double[2])));
} else {
+ LOG.debug("Model is not valid for predicting object count for next
interval. Returning -1");
return -1L;
}
@@ -266,6 +279,14 @@ public class ConnectionStatusAnalytics implements
StatusAnalytics {
this.intervalMillis = intervalTimeMillis;
}
+ public long getQueryIntervalMillis() {
+ return queryIntervalMillis;
+ }
+
+ public void setQueryIntervalMillis(long queryIntervalMillis) {
+ this.queryIntervalMillis = queryIntervalMillis;
+ }
+
public String getScoreName() {
return scoreName;
}
@@ -334,6 +355,7 @@ public class ConnectionStatusAnalytics implements
StatusAnalytics {
*/
private Long convertTimePrediction(Double prediction, Long timeMillis) {
if (Double.isNaN(prediction) || Double.isInfinite(prediction) ||
prediction < timeMillis) {
+ LOG.debug("Time prediction value is invalid: {}. Returning
-1.",prediction);
return -1L;
} else {
return Math.max(0, Math.round(prediction) - timeMillis);
@@ -346,7 +368,8 @@ public class ConnectionStatusAnalytics implements
StatusAnalytics {
* @return prediction prediction value converted into valid value for
consumption
*/
private Long convertCountPrediction(Double prediction) {
- if (Double.isNaN(prediction) || Double.isInfinite(prediction) ||
prediction < 0) {
+ if (Double.isNaN(prediction) || Double.isInfinite(prediction)) {
+ LOG.debug("Count prediction value is invalid: {}. Returning
-1.",prediction);
return -1L;
} else {
return Math.max(0, Math.round(prediction));
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
index 17a2704..e7eecac 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
@@ -37,16 +37,19 @@ public class ConnectionStatusAnalyticsEngine implements
StatusAnalyticsEngine {
protected final FlowFileEventRepository flowFileEventRepository;
protected final Map<String, Tuple<StatusAnalyticsModel,
StatusMetricExtractFunction>> modelMap;
protected final long predictionIntervalMillis;
+ protected final long queryIntervalMillis;
protected final String scoreName;
protected final double scoreThreshold;
public ConnectionStatusAnalyticsEngine(FlowManager flowManager,
ComponentStatusRepository statusRepository, FlowFileEventRepository
flowFileEventRepository,
- Map<String, Tuple<StatusAnalyticsModel,
StatusMetricExtractFunction>> modelMap, long predictionIntervalMillis, String
scoreName, double scoreThreshold) {
+ Map<String,
Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, long
predictionIntervalMillis,
+ long queryIntervalMillis, String
scoreName, double scoreThreshold) {
this.flowManager = flowManager;
this.statusRepository = statusRepository;
this.flowFileEventRepository = flowFileEventRepository;
this.predictionIntervalMillis = predictionIntervalMillis;
this.modelMap = modelMap;
+ this.queryIntervalMillis = queryIntervalMillis;
this.scoreName = scoreName;
this.scoreThreshold = scoreThreshold;
}
@@ -60,6 +63,7 @@ public class ConnectionStatusAnalyticsEngine implements
StatusAnalyticsEngine {
public StatusAnalytics getStatusAnalytics(String identifier) {
ConnectionStatusAnalytics connectionStatusAnalytics = new
ConnectionStatusAnalytics(statusRepository, flowManager,
flowFileEventRepository, modelMap, identifier, false);
connectionStatusAnalytics.setIntervalTimeMillis(predictionIntervalMillis);
+ connectionStatusAnalytics.setQueryIntervalMillis(queryIntervalMillis);
connectionStatusAnalytics.setScoreName(scoreName);
connectionStatusAnalytics.setScoreThreshold(scoreThreshold);
connectionStatusAnalytics.refresh();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java
index 57b300f..7be0b8a 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java
@@ -33,15 +33,17 @@ public class TestCachingConnectionStatusAnalyticsEngine
extends TestStatusAnalyt
public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager
flowManager, FlowFileEventRepository flowFileEventRepository,
ComponentStatusRepository componentStatusRepository,
Map<String,
Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap,
- long
predictIntervalMillis, String scoreName, double scoreThreshold) {
+ long
predictIntervalMillis, long queryIntervalMillis, String scoreName, double
scoreThreshold) {
- return new CachingConnectionStatusAnalyticsEngine(flowManager,
componentStatusRepository, flowFileEventRepository, modelMap,
predictIntervalMillis, scoreName, scoreThreshold);
+ return new CachingConnectionStatusAnalyticsEngine(flowManager,
componentStatusRepository, flowFileEventRepository, modelMap,
predictIntervalMillis,
+
queryIntervalMillis, scoreName, scoreThreshold);
}
@Test
public void testCachedStatusAnalytics() {
StatusAnalyticsEngine statusAnalyticsEngine = new
CachingConnectionStatusAnalyticsEngine(flowManager, statusRepository,
flowFileEventRepository, modelMap,
-
DEFAULT_PREDICT_INTERVAL_MILLIS, DEFAULT_SCORE_NAME,
DEFAULT_SCORE_THRESHOLD);
+
DEFAULT_PREDICT_INTERVAL_MILLIS,
DEFAULT_QUERY_INTERVAL_MILLIS,
+
DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD);
StatusAnalytics statusAnalyticsA =
statusAnalyticsEngine.getStatusAnalytics("A");
StatusAnalytics statusAnalyticsB =
statusAnalyticsEngine.getStatusAnalytics("B");
StatusAnalytics statusAnalyticsTest =
statusAnalyticsEngine.getStatusAnalytics("A");
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java
index 0b5612d..eb56129 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java
@@ -28,8 +28,9 @@ public class TestConnectionStatusAnalyticsEngine extends
TestStatusAnalyticsEngi
@Override
public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager
flowManager, FlowFileEventRepository flowFileEventRepository,
ComponentStatusRepository statusRepository, Map<String,
Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap,
- long
predictIntervalMillis, String scoreName, double scoreThreshold) {
- return new ConnectionStatusAnalyticsEngine(flowManager,
statusRepository, flowFileEventRepository,modelMap,
DEFAULT_PREDICT_INTERVAL_MILLIS, scoreName, scoreThreshold);
+ long
predictIntervalMillis, long queryIntervalMillis, String scoreName, double
scoreThreshold) {
+ return new ConnectionStatusAnalyticsEngine(flowManager,
statusRepository, flowFileEventRepository,modelMap,
+
DEFAULT_PREDICT_INTERVAL_MILLIS, DEFAULT_QUERY_INTERVAL_MILLIS, scoreName,
scoreThreshold);
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java
index 477216c..f1dc0be 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java
@@ -43,6 +43,7 @@ import org.mockito.stubbing.Answer;
public abstract class TestStatusAnalyticsEngine {
static final long DEFAULT_PREDICT_INTERVAL_MILLIS = 3L * 60 * 1000;
+ static final long DEFAULT_QUERY_INTERVAL_MILLIS = 3L * 60 * 1000;
static final String DEFAULT_SCORE_NAME = "rSquared";
static final double DEFAULT_SCORE_THRESHOLD = .9;
@@ -89,13 +90,13 @@ public abstract class TestStatusAnalyticsEngine {
@Test
public void testGetStatusAnalytics() {
StatusAnalyticsEngine statusAnalyticsEngine =
getStatusAnalyticsEngine(flowManager,flowFileEventRepository, statusRepository,
modelMap, DEFAULT_PREDICT_INTERVAL_MILLIS,
-
DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD);
+
DEFAULT_QUERY_INTERVAL_MILLIS, DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD);
StatusAnalytics statusAnalytics =
statusAnalyticsEngine.getStatusAnalytics("1");
assertNotNull(statusAnalytics);
}
public abstract StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager
flowManager, FlowFileEventRepository flowFileEventRepository,
ComponentStatusRepository componentStatusRepository, Map<String,
Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap,
- long
predictIntervalMillis, String scoreName, double scoreThreshold);
+ long
predictIntervalMillis, long queryIntervalMillis, String scoreName, double
scoreThreshold);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index fe69177..56bd941 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -212,6 +212,7 @@
<!-- nifi.properties: analytics properties -->
<nifi.analytics.predict.enabled>false</nifi.analytics.predict.enabled>
<nifi.analytics.predict.interval>3
mins</nifi.analytics.predict.interval>
+ <nifi.analytics.query.interval>3 mins</nifi.analytics.query.interval>
<nifi.analytics.connection.model.implementation>org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares</nifi.analytics.connection.model.implementation>
<nifi.analytics.connection.model.score.name>rSquared</nifi.analytics.connection.model.score.name>
<nifi.analytics.connection.model.score.threshold>.90</nifi.analytics.connection.model.score.threshold>
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 3df0005..617e722 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -260,6 +260,7 @@ nifi.variable.registry.properties=
# analytics properties #
nifi.analytics.predict.enabled=${nifi.analytics.predict.enabled}
nifi.analytics.predict.interval=${nifi.analytics.predict.interval}
+nifi.analytics.query.interval=${nifi.analytics.query.interval}
nifi.analytics.connection.model.implementation=${nifi.analytics.connection.model.implementation}
nifi.analytics.connection.model.score.name=${nifi.analytics.connection.model.score.name}
nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.model.score.threshold}