This is an automated email from the ASF dual-hosted git repository. aichrist pushed a commit to branch analytics-framework in repository https://gitbox.apache.org/repos/asf/nifi.git
commit c98ae35ddcdbbb56e79fdedd2f39bb21ef8d417e Author: Yolanda Davis <[email protected]> AuthorDate: Wed Jul 31 08:57:15 2019 -0400 NIFI-6510 Changes to inject flowManager instead of flow controller, also changes to properly reflect when predictions can be made vs not. (cherry picked from commit 6fae058) --- .../org/apache/nifi/controller/FlowController.java | 2 +- .../CachingConnectionStatusAnalyticsEngine.java | 16 ++-- .../analytics/ConnectionStatusAnalytics.java | 89 +++++++++++++--------- .../analytics/ConnectionStatusAnalyticsEngine.java | 11 ++- .../status/analytics/SimpleRegressionBSAM.java | 18 ++++- 5 files changed, 81 insertions(+), 55 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index b5de777..141a6a4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -594,7 +594,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node } componentStatusRepository = createComponentStatusRepository(); - analyticsEngine = new CachingConnectionStatusAnalyticsEngine(this, componentStatusRepository); + analyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository); eventAccess = new StandardEventAccess(this, flowFileEventRepository); timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java index c4836c6..f69ed33 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java @@ -18,7 +18,7 @@ package org.apache.nifi.controller.status.analytics; import java.util.concurrent.TimeUnit; -import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,15 +28,15 @@ import com.github.benmanes.caffeine.cache.Caffeine; public class CachingConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine { private ComponentStatusRepository statusRepository; - private FlowController controller; + private FlowManager flowManager; private volatile Cache<String, ConnectionStatusAnalytics> cache; private static final Logger LOG = LoggerFactory.getLogger(CachingConnectionStatusAnalyticsEngine.class); - public CachingConnectionStatusAnalyticsEngine(FlowController controller, ComponentStatusRepository statusRepository) { - this.controller = controller; + public CachingConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository) { + this.flowManager = flowManager; this.statusRepository = statusRepository; this.cache = Caffeine.newBuilder() - .expireAfterWrite(5, TimeUnit.MINUTES) + .expireAfterWrite(30, TimeUnit.MINUTES) .build(); } @@ -45,12 +45,12 @@ public class CachingConnectionStatusAnalyticsEngine implements StatusAnalyticsEn ConnectionStatusAnalytics connectionStatusAnalytics = cache.getIfPresent(identifier); if(connectionStatusAnalytics == null){ - LOG.info("Creating new analytics for connection id: {0}", identifier); - connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,controller,identifier); + LOG.debug("Creating new status analytics object for connection id: {}", identifier); + connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,flowManager,identifier,true); connectionStatusAnalytics.init(); cache.put(identifier,connectionStatusAnalytics); }else{ - LOG.info("Pulled existing analytics from cache for connection id: {}", identifier); + LOG.debug("Pulled existing analytics from cache for connection id: {}", identifier); connectionStatusAnalytics.refresh(); } return connectionStatusAnalytics; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java index bad2ff1..8b7964e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java @@ -25,7 +25,7 @@ import java.util.Optional; import java.util.stream.Stream; import org.apache.nifi.connectable.Connection; -import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; import org.apache.nifi.controller.status.history.StatusHistory; @@ -45,24 +45,27 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { private QueryWindow queryWindow; private final ComponentStatusRepository componentStatusRepository; private final String connectionIdentifier; - private final FlowController flowController; + private final FlowManager flowManager; + private final Boolean supportOnlineLearning; - public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowController flowController, String connectionIdentifier) { + public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowManager flowManager, String connectionIdentifier, Boolean supportOnlineLearning) { this.componentStatusRepository = componentStatusRepository; - this.flowController = flowController; + this.flowManager = flowManager; this.connectionIdentifier = connectionIdentifier; + this.supportOnlineLearning = supportOnlineLearning; } public void init() { + LOG.debug("Initialize analytics connection id: {} ", connectionIdentifier); + if (this.modelMap == null || this.modelMap.isEmpty()) { - Tuple<StatusAnalyticsModel, ExtractFunction> countModelFunction = new Tuple<>(new SimpleRegressionBSAM(), extract); - Tuple<StatusAnalyticsModel, ExtractFunction> byteModelFunction = new Tuple<>(new SimpleRegressionBSAM(), extract); + Tuple<StatusAnalyticsModel, ExtractFunction> countModelFunction = new Tuple<>(new SimpleRegressionBSAM(!supportsOnlineLearning()), extract); + Tuple<StatusAnalyticsModel, ExtractFunction> byteModelFunction = new Tuple<>(new SimpleRegressionBSAM(!supportsOnlineLearning()), extract); this.modelMap = new HashMap<>(); //TODO: Should change keys used here this.modelMap.put(ConnectionStatusDescriptor.QUEUED_COUNT.getField(), countModelFunction); this.modelMap.put(ConnectionStatusDescriptor.QUEUED_BYTES.getField(), byteModelFunction); - this.queryWindow = new QueryWindow(System.currentTimeMillis() - (5 * 60 * 1000), System.currentTimeMillis()); } refresh(); @@ -70,17 +73,20 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { public void refresh() { + + LOG.debug("Refreshing model with new data for connection id: {} ", connectionIdentifier); + + this.queryWindow = new QueryWindow(System.currentTimeMillis() - getIntervalTimeMillis(), System.currentTimeMillis()); modelMap.forEach((metric, modelFunction) -> { StatusAnalyticsModel model = modelFunction.getKey(); ExtractFunction extract = modelFunction.getValue(); StatusHistory statusHistory = componentStatusRepository.getConnectionStatusHistory(connectionIdentifier, queryWindow.getStartDateTime(), queryWindow.getEndDateTime(), Integer.MAX_VALUE); Tuple<Stream<Double>, Stream<Double>> modelData = extract.extractMetric(metric, statusHistory); - LOG.info("Refreshing model for connection id: {} ", connectionIdentifier); Stream<Double> times = modelData.getKey(); - Stream<Double> counts = modelData.getValue(); - //times is the X axis and counts is on the y axis - model.learn(times, counts); + Stream<Double> values = modelData.getValue(); + //times are the X axis and values are on the y axis + model.learn(times, values); }); } @@ -90,7 +96,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { * * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. */ - public long getTimeToBytesBackpressureMillis() { + public Long getTimeToBytesBackpressureMillis() { final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey(); final Connection connection = getConnection(); @@ -100,10 +106,10 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold(); final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B); final double prediction = bytesModel.predictX(backPressureBytes); - if (prediction != Double.NaN) { + if (prediction != Double.NaN && prediction >= System.currentTimeMillis() && !Double.isInfinite(prediction)) { return Math.max(0, Math.round(prediction) - System.currentTimeMillis()); } else { - return Long.MAX_VALUE; + return -1L; } } @@ -113,7 +119,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { * * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue. */ - public long getTimeToCountBackpressureMillis() { + public Long getTimeToCountBackpressureMillis() { final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey(); final Connection connection = getConnection(); @@ -123,10 +129,10 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold(); final Double prediction = countModel.predictX(backPressureCountThreshold); - if (prediction != Double.NaN) { + if (prediction != Double.NaN && prediction >= System.currentTimeMillis() && !Double.isInfinite(prediction)) { return Math.max(0, Math.round(prediction) - System.currentTimeMillis()); } else { - return Long.MAX_VALUE; + return -1L; } } @@ -136,13 +142,13 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. */ - public long getNextIntervalBytes() { + public Long getNextIntervalBytes() { final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey(); final Double prediction = bytesModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis()); - if (prediction != Double.NaN) { + if (prediction != Double.NaN && prediction >= 0) { return Math.round(prediction); } else { - return 0; + return -1L; } } @@ -152,29 +158,34 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue. */ - public int getNextIntervalCount() { + public Long getNextIntervalCount() { final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey(); final Double prediction = countModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis()); - if (prediction != Double.NaN) { - return ((Long) Math.round(prediction)).intValue(); + if (prediction != Double.NaN && prediction >= 0) { + return Math.round(prediction); } else { - return 0; + return -1L; } } - public int getNextIntervalPercentageUseCount(){ + public Long getNextIntervalPercentageUseCount(){ final Connection connection = getConnection(); if (connection == null) { throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!"); } final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold(); + final long nextIntervalCount = getNextIntervalCount(); - return ((Long)Math.round((getNextIntervalCount()/backPressureCountThreshold) * 100)).intValue(); + if(nextIntervalCount > -1L) { + return Math.round((getNextIntervalCount() / backPressureCountThreshold) * 100); + }else{ + return -1L; + } } - public int getNextIntervalPercentageUseBytes(){ + public Long getNextIntervalPercentageUseBytes(){ final Connection connection = getConnection(); if (connection == null) { @@ -182,13 +193,17 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { } final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold(); final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B); + final long nextIntervalBytes = getNextIntervalBytes(); - return ((Long)Math.round((getNextIntervalBytes()/ backPressureBytes) * 100)).intValue(); - + if(nextIntervalBytes > -1L) { + return Math.round((getNextIntervalBytes() / backPressureBytes) * 100); + }else{ + return -1L; + } } - public long getIntervalTimeMillis(){ - return getQueryWindow().getTimeDifferenceMillis(); + public Long getIntervalTimeMillis(){ + return (5L * 60 * 1000); } @Override @@ -206,13 +221,13 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { predictions.put("timeToBytesBackpressureMillis", getTimeToBytesBackpressureMillis()); predictions.put("timeToCountBackpressureMillis", getTimeToCountBackpressureMillis()); predictions.put("nextIntervalBytes", getNextIntervalBytes()); - predictions.put("nextIntervalCount", (long) getNextIntervalCount()); - predictions.put("nextIntervalPercentageUseCount", (long)getNextIntervalPercentageUseCount()); - predictions.put("nextIntervalPercentageUseBytes", (long)getNextIntervalPercentageUseBytes()); + predictions.put("nextIntervalCount", getNextIntervalCount()); + predictions.put("nextIntervalPercentageUseCount", getNextIntervalPercentageUseCount()); + predictions.put("nextIntervalPercentageUseBytes", getNextIntervalPercentageUseBytes()); predictions.put("intervalTimeMillis", getIntervalTimeMillis()); predictions.forEach((key,value) -> { - LOG.info("Prediction model for connection id {}: {}={} ", connectionIdentifier,key,value); + LOG.debug("Prediction model for connection id {}: {}={} ", connectionIdentifier,key,value); }); return predictions; @@ -220,11 +235,11 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { @Override public boolean supportsOnlineLearning() { - return true; + return supportOnlineLearning; } private Connection getConnection() { - final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup(); + final ProcessGroup rootGroup = flowManager.getRootGroup(); Optional<Connection> connection = rootGroup.findAllConnections().stream().filter(c -> c.getIdentifier().equals(this.connectionIdentifier)).findFirst(); return connection.orElse(null); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java index 7f9db25..8f1222c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java @@ -16,28 +16,27 @@ */ package org.apache.nifi.controller.status.analytics; -import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine { private ComponentStatusRepository statusRepository; - private FlowController controller; + private FlowManager flowManager; private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalyticsEngine.class); - public ConnectionStatusAnalyticsEngine(FlowController controller, ComponentStatusRepository statusRepository) { - this.controller = controller; + public ConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository) { + this.flowManager = flowManager; this.statusRepository = statusRepository; } @Override public StatusAnalytics getStatusAnalytics(String identifier) { - ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,controller,identifier); + ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,flowManager,identifier,false); connectionStatusAnalytics.init(); return connectionStatusAnalytics; } - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java index 8aa4a45..21c0370 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java @@ -21,21 +21,33 @@ import java.util.stream.Stream; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.math3.stat.regression.SimpleRegression; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SimpleRegressionBSAM extends BivariateStatusAnalyticsModel { - private SimpleRegression regression; + private static final Logger LOG = LoggerFactory.getLogger(SimpleRegressionBSAM.class); + private final SimpleRegression regression; + private final Boolean clearObservationsOnLearn; + + public SimpleRegressionBSAM(Boolean clearObservationsOnLearn) { - public SimpleRegressionBSAM() { this.regression = new SimpleRegression(); + this.clearObservationsOnLearn = clearObservationsOnLearn; } @Override public void learn(Stream<Double> features, Stream<Double> labels) { double[] labelArray = ArrayUtils.toPrimitive(labels.toArray(Double[]::new)); double[][] featuresMatrix = features.map(feature -> new double[]{feature}).toArray(double[][]::new); - regression.clear(); + + if(clearObservationsOnLearn) { + regression.clear(); + } + regression.addObservations(featuresMatrix, labelArray); + LOG.debug("Model is using equation: y = {}x + {}", regression.getSlope(), regression.getIntercept()); + } @Override
