This is an automated email from the ASF dual-hosted git repository. aichrist pushed a commit to branch analytics-framework in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 6736574284b4bb6985a3d2508a86b54acc389f02 Author: Matthew Burgess <[email protected]> AuthorDate: Fri Aug 16 15:44:44 2019 -0400 NIFI-6510 Added property to nifi.properties - Prediction Interval for connection status analytics (#11) --- .../src/main/java/org/apache/nifi/util/NiFiProperties.java | 6 ++++++ .../src/test/resources/NiFiProperties/conf/nifi.properties | 3 +++ nifi-docs/src/main/asciidoc/administration-guide.adoc | 10 ++++++++++ .../src/test/resources/conf/nifi.properties | 3 +++ .../java/org/apache/nifi/controller/FlowController.java | 11 ++++++++++- .../analytics/CachingConnectionStatusAnalyticsEngine.java | 6 +++++- .../status/analytics/ConnectionStatusAnalytics.java | 7 ++++++- .../status/analytics/ConnectionStatusAnalyticsEngine.java | 5 ++++- .../TestCachingConnectionStatusAnalyticsEngine.java | 13 +++++++++---- .../analytics/TestConnectionStatusAnalyticsEngine.java | 5 +++-- .../status/analytics/TestStatusAnalyticsEngine.java | 7 +++++-- .../src/test/resources/conf/nifi.properties | 3 +++ .../src/test/resources/flowcontrollertest.nifi.properties | 3 +++ .../nifi-framework/nifi-resources/pom.xml | 3 +++ .../nifi-resources/src/main/resources/conf/nifi.properties | 5 ++++- 15 files changed, 77 insertions(+), 13 deletions(-) diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 31b1db5..aa00793 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -238,6 +238,9 @@ public abstract class NiFiProperties { // expression language properties public static final String VARIABLE_REGISTRY_PROPERTIES = "nifi.variable.registry.properties"; + // analytics properties + public static final String ANALYTICS_PREDICTION_INTERVAL = "nifi.analytics.predict.interval"; + // defaults public static final Boolean DEFAULT_AUTO_RESUME_STATE = true; public static final String DEFAULT_AUTHORIZER_CONFIGURATION_FILE = "conf/authorizers.xml"; @@ -308,6 +311,9 @@ public abstract class NiFiProperties { // Kerberos defaults public static final String DEFAULT_KERBEROS_AUTHENTICATION_EXPIRATION = "12 hours"; + // analytics defaults + public static final String DEFAULT_ANALYTICS_PREDICTION_INTERVAL = "3 mins"; + /** * Retrieves the property value for the given property key. diff --git a/nifi-commons/nifi-properties/src/test/resources/NiFiProperties/conf/nifi.properties b/nifi-commons/nifi-properties/src/test/resources/NiFiProperties/conf/nifi.properties index 2c58fa9..9ab3d99 100644 --- a/nifi-commons/nifi-properties/src/test/resources/NiFiProperties/conf/nifi.properties +++ b/nifi-commons/nifi-properties/src/test/resources/NiFiProperties/conf/nifi.properties @@ -120,3 +120,6 @@ nifi.cluster.manager.node.api.request.threads=10 nifi.cluster.manager.flow.retrieval.delay=5 sec nifi.cluster.manager.protocol.threads=10 nifi.cluster.manager.safemode.duration=0 sec + +# analytics properties # +nifi.analytics.predict.interval=3 mins \ No newline at end of file diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index 57728ea..6fccde9 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -3317,6 +3317,16 @@ that is specified. |`nifi.kerberos.spengo.authentication.expiration`*|The expiration duration of a successful Kerberos user authentication, if used. The default value is `12 hours`. |==== +[[analytics_properties]] +=== Analytics Properties + +These properties determine the behavior of the internal NiFi Analytics capability, such as backpressure prediction, and should be configured the same way on all nodes. + +|==== +|*Property*|*Description* +|`nifi.analytics.predict.interval`|This indicates a time interval for which analytical predictions (queue saturation, e.g.) should be made. The default value is `3 mins`. +|==== + [[custom_properties]] === Custom Properties diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/resources/conf/nifi.properties index a768adc..4d139d7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/resources/conf/nifi.properties @@ -121,3 +121,6 @@ nifi.cluster.manager.node.api.request.threads=10 nifi.cluster.manager.flow.retrieval.delay=5 sec nifi.cluster.manager.protocol.threads=10 nifi.cluster.manager.safemode.duration=0 sec + +# analytics properties # +nifi.analytics.predict.interval=3 mins \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index bf1d06c..55ff0bb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -593,8 +593,17 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node zooKeeperStateServer = null; } + // Determine interval for predicting future feature values + final String predictionInterval = nifiProperties.getProperty(NiFiProperties.ANALYTICS_PREDICTION_INTERVAL, NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL); + long predictionIntervalMillis; + try { + predictionIntervalMillis = FormatUtils.getTimeDuration(predictionInterval, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + predictionIntervalMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL, TimeUnit.MILLISECONDS); + } + componentStatusRepository = createComponentStatusRepository(); - analyticsEngine = new ConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository,flowFileEventRepository); + analyticsEngine = new ConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, predictionIntervalMillis); eventAccess = new StandardEventAccess(this, flowFileEventRepository); timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java index 747c496..95f655e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java @@ -32,15 +32,18 @@ public class CachingConnectionStatusAnalyticsEngine implements StatusAnalyticsEn private final FlowManager flowManager; private final FlowFileEventRepository flowFileEventRepository; private volatile Cache<String, ConnectionStatusAnalytics> cache; + private final long predictionIntervalMillis; private static final Logger LOG = LoggerFactory.getLogger(CachingConnectionStatusAnalyticsEngine.class); - public CachingConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository, FlowFileEventRepository flowFileEventRepository) { + public CachingConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository, + FlowFileEventRepository flowFileEventRepository, long predictionIntervalMillis) { this.flowManager = flowManager; this.statusRepository = statusRepository; this.flowFileEventRepository = flowFileEventRepository; this.cache = Caffeine.newBuilder() .expireAfterWrite(30, TimeUnit.MINUTES) .build(); + this.predictionIntervalMillis = predictionIntervalMillis; } @Override @@ -50,6 +53,7 @@ public class CachingConnectionStatusAnalyticsEngine implements StatusAnalyticsEn if (connectionStatusAnalytics == null) { LOG.debug("Creating new status analytics object for connection id: {}", identifier); connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager,flowFileEventRepository, identifier, true); + connectionStatusAnalytics.setIntervalTimeMillis(predictionIntervalMillis); connectionStatusAnalytics.init(); cache.put(identifier, connectionStatusAnalytics); } else { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java index aa19b1d..313e7ab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java @@ -59,6 +59,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { private final FlowManager flowManager; private final Boolean supportOnlineLearning; private Boolean extendWindow = false; + private long intervalMillis = 3L * 60 * 1000; // Default is 3 minutes private static double SCORE_THRESHOLD = .90; public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, String connectionIdentifier, @@ -255,7 +256,11 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { } public Long getIntervalTimeMillis() { - return 3L * 60 * 1000; + return intervalMillis; + } + + public void setIntervalTimeMillis(long intervalTimeMillis) { + this.intervalMillis = intervalTimeMillis; } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java index 6f261a0..a9ba4ea 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java @@ -27,16 +27,19 @@ public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine { private final ComponentStatusRepository statusRepository; private final FlowManager flowManager; private final FlowFileEventRepository flowFileEventRepository; + private final long predictionIntervalMillis; - public ConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository, FlowFileEventRepository flowFileEventRepository) { + public ConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository, FlowFileEventRepository flowFileEventRepository, long predictionIntervalMillis) { this.flowManager = flowManager; this.statusRepository = statusRepository; this.flowFileEventRepository = flowFileEventRepository; + this.predictionIntervalMillis = predictionIntervalMillis; } @Override public StatusAnalytics getStatusAnalytics(String identifier) { ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager, flowFileEventRepository, identifier, false); + connectionStatusAnalytics.setIntervalTimeMillis(predictionIntervalMillis); connectionStatusAnalytics.init(); return connectionStatusAnalytics; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java index 77ffa9b..2c86bfc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java @@ -21,20 +21,25 @@ import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + public class TestCachingConnectionStatusAnalyticsEngine extends TestStatusAnalyticsEngine { @Override - public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, ComponentStatusRepository componentStatusRepository) { - return new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository); + public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, + ComponentStatusRepository componentStatusRepository, long predictIntervalMillis) { + return new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, predictIntervalMillis); } @Test public void testCachedStatusAnalytics() { - StatusAnalyticsEngine statusAnalyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository); + StatusAnalyticsEngine statusAnalyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository, DEFAULT_PREDICT_INTERVAL_MILLIS); StatusAnalytics statusAnalyticsA = statusAnalyticsEngine.getStatusAnalytics("A"); StatusAnalytics statusAnalyticsB = statusAnalyticsEngine.getStatusAnalytics("B"); StatusAnalytics statusAnalyticsTest = statusAnalyticsEngine.getStatusAnalytics("A"); - assert (statusAnalyticsA.equals(statusAnalyticsTest)); + assertEquals(statusAnalyticsA, statusAnalyticsTest); + assertNotEquals(statusAnalyticsB, statusAnalyticsTest); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java index 99c9fae..172c3b5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java @@ -23,8 +23,9 @@ import org.apache.nifi.controller.status.history.ComponentStatusRepository; public class TestConnectionStatusAnalyticsEngine extends TestStatusAnalyticsEngine { @Override - public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, ComponentStatusRepository statusRepository) { - return new ConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository); + public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, + ComponentStatusRepository statusRepository, long predictIntervalMillis) { + return new ConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository, DEFAULT_PREDICT_INTERVAL_MILLIS); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java index ee3d4e5..f666fdf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java @@ -36,6 +36,8 @@ import org.mockito.Mockito; public abstract class TestStatusAnalyticsEngine { + static final long DEFAULT_PREDICT_INTERVAL_MILLIS = 3L * 60 * 1000; + protected ComponentStatusRepository statusRepository; protected FlowManager flowManager; protected FlowFileEventRepository flowFileEventRepository; @@ -55,11 +57,12 @@ public abstract class TestStatusAnalyticsEngine { @Test public void testGetStatusAnalytics() { - StatusAnalyticsEngine statusAnalyticsEngine = getStatusAnalyticsEngine(flowManager,flowFileEventRepository, statusRepository); + StatusAnalyticsEngine statusAnalyticsEngine = getStatusAnalyticsEngine(flowManager,flowFileEventRepository, statusRepository, DEFAULT_PREDICT_INTERVAL_MILLIS); StatusAnalytics statusAnalytics = statusAnalyticsEngine.getStatusAnalytics("1"); assertNotNull(statusAnalytics); } - public abstract StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, ComponentStatusRepository componentStatusRepository); + public abstract StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, + ComponentStatusRepository componentStatusRepository, long predictIntervalMillis); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties index cc8f098..3d9df16 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties @@ -119,3 +119,6 @@ nifi.cluster.manager.node.api.request.threads=10 nifi.cluster.manager.flow.retrieval.delay=5 sec nifi.cluster.manager.protocol.threads=10 nifi.cluster.manager.safemode.duration=0 sec + +# analytics properties # +nifi.analytics.predict.interval=3 mins \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties index a4c1a4a..9e6aecb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties @@ -127,3 +127,6 @@ nifi.cluster.manager.node.api.request.threads=10 nifi.cluster.manager.flow.retrieval.delay=5 sec nifi.cluster.manager.protocol.threads=10 nifi.cluster.manager.safemode.duration=0 sec + +# analytics properties # +nifi.analytics.predict.interval=3 mins \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index 91ab8bd..15ac294 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -208,6 +208,9 @@ <nifi.kerberos.spnego.principal /> <nifi.kerberos.spnego.keytab.location /> <nifi.kerberos.spnego.authentication.expiration>12 hours</nifi.kerberos.spnego.authentication.expiration> + + <!-- nifi.properties: analytics properties --> + <nifi.analytics.predict.interval>3 mins</nifi.analytics.predict.interval> </properties> <build> <plugins> diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 556d783..e6140fa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -255,4 +255,7 @@ nifi.kerberos.spnego.authentication.expiration=${nifi.kerberos.spnego.authentica # external properties files for variable registry # supports a comma delimited list of file locations -nifi.variable.registry.properties= \ No newline at end of file +nifi.variable.registry.properties= + +# analytics properties # +nifi.analytics.predict.interval=${nifi.analytics.predict.interval} \ No newline at end of file
