This is an automated email from the ASF dual-hosted git repository. aichrist pushed a commit to branch analytics-framework in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 7bc646751b3065e17a9f04fc572ec8d5a5cbea42 Author: Yolanda Davis <[email protected]> AuthorDate: Mon Jul 29 07:16:21 2019 -0400 NIFI-6510 Analytics Framework Introduction (#10) * DFA-9 - Initial refactor for Status Analytics - created additional interfaces for models, refactored callers to use StatusAnalytics objects with connection context. Implemented SimpleRegression model. DFA-9 - added logging * DFA-9 - relocated query window to CSA from model, adding the prediction percentages and time interval * DFA-9 - checkstyle fixes --- .../nifi/controller/status/ConnectionStatus.java | 19 ++ .../controller/status/analytics/QueryWindow.java | 59 +++++ .../status/analytics/StatusAnalytics.java | 26 +-- .../status/analytics/StatusAnalyticsEngine.java | 3 +- ...tusAnalytics.java => StatusAnalyticsModel.java} | 17 +- .../status/ConnectionStatisticsSnapshotDTO.java | 37 ++- .../analytics/BivariateStatusAnalyticsModel.java | 30 ++- .../CachingConnectionStatusAnalyticsEngine.java | 147 ++---------- .../analytics/ConnectionStatusAnalytics.java | 254 +++++++++++++++++++++ .../analytics/ConnectionStatusAnalyticsEngine.java | 121 +--------- .../status/analytics/SimpleRegressionBSAM.java | 55 +++++ .../apache/nifi/reporting/StandardEventAccess.java | 15 +- .../apache/nifi/web/StandardNiFiServiceFacade.java | 60 ++--- .../org/apache/nifi/web/api/dto/DtoFactory.java | 98 ++++---- .../nifi/web/controller/ControllerFacade.java | 59 ++--- 15 files changed, 595 insertions(+), 405 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java index ee7dd45..783677a 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java @@ -45,6 +45,8 @@ public class ConnectionStatus implements Cloneable { private long nextPredictedQueuedBytes; private long predictedTimeToCountBackpressureMillis; private long predictedTimeToBytesBackpressureMillis; + private int predictedPercentCount = 0; + private int predictedPercentBytes = 0; public String getId() { return id; @@ -231,6 +233,23 @@ public class ConnectionStatus implements Cloneable { this.predictedTimeToBytesBackpressureMillis = predictedTimeToBytesBackpressureMillis; } + public int getPredictedPercentCount() { + return predictedPercentCount; + } + + public void setPredictedPercentCount(int predictedPercentCount) { + this.predictedPercentCount = predictedPercentCount; + } + + public int getPredictedPercentBytes() { + return predictedPercentBytes; + } + + public void setPredictedPercentBytes(int predictedPercentBytes) { + this.predictedPercentBytes = predictedPercentBytes; + } + + @Override public ConnectionStatus clone() { final ConnectionStatus clonedObj = new ConnectionStatus(); diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/QueryWindow.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/QueryWindow.java new file mode 100644 index 0000000..c477872 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/QueryWindow.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.analytics; + +import java.util.Date; + +public class QueryWindow { + + private long startTimeMillis; + private long endTimeMillis; + + public QueryWindow(long startTimeMillis, long endTimeMillis) { + this.startTimeMillis = startTimeMillis; + this.endTimeMillis = endTimeMillis; + } + + public long getStartTimeMillis() { + return startTimeMillis; + } + + public void setStartTimeMillis(long startTimeMillis) { + this.startTimeMillis = startTimeMillis; + } + + public long getEndTimeMillis() { + return endTimeMillis; + } + + public void setEndTimeMillis(long endTimeMillis) { + this.endTimeMillis = endTimeMillis; + } + + public Date getStartDateTime() { + return new Date(startTimeMillis); + } + + public Date getEndDateTime() { + return new Date(endTimeMillis); + } + + public long getTimeDifferenceMillis(){ + return endTimeMillis - startTimeMillis; + } + +} diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java index 564f1c9..a65629f 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java @@ -16,32 +16,16 @@ */ package org.apache.nifi.controller.status.analytics; +import java.util.Map; + /** * The StatusAnalytics interface offers methods for accessing predicted and other values for a single component (Connection instance, e.g.) */ public interface StatusAnalytics { - /** - * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue. - * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. - */ - long getTimeToBytesBackpressureMillis(); - - /** - * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue. - * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue. - */ - long getTimeToCountBackpressureMillis(); + QueryWindow getQueryWindow(); + Map<String,Long> getPredictions(); + boolean supportsOnlineLearning(); - /** - * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.). - * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. - */ - long getNextIntervalBytes(); - /** - * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.). - * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue. - */ - int getNextIntervalCount(); } diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsEngine.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsEngine.java index 5cbc333..01021c2 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsEngine.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsEngine.java @@ -18,5 +18,6 @@ package org.apache.nifi.controller.status.analytics; public interface StatusAnalyticsEngine { - ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId); + StatusAnalytics getStatusAnalytics(String componentId); + } diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java similarity index 66% copy from nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java copy to nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java index 9792ae4..72c81b1 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java @@ -16,17 +16,12 @@ */ package org.apache.nifi.controller.status.analytics; +import java.util.stream.Stream; -/** - * The ConnectionStatusAnalytics interface offers additional methods to the StatusAnalytics interface related to the supporting connection information (group ID, e.g.) - */ -public interface ConnectionStatusAnalytics extends StatusAnalytics{ +public interface StatusAnalyticsModel { + + void learn(Stream<Double> features, Stream<Double> labels); + Double predict(Double feature); + Boolean supportsOnlineLearning(); - String getGroupId(); - String getId(); - String getName(); - String getSourceId(); - String getSourceName(); - String getDestinationId(); - String getDestinationName(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java index 526bdcf..a521db4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java @@ -16,10 +16,10 @@ */ package org.apache.nifi.web.api.dto.status; -import io.swagger.annotations.ApiModelProperty; - import javax.xml.bind.annotation.XmlType; +import io.swagger.annotations.ApiModelProperty; + /** * DTO for serializing the statistics of a connection. */ @@ -39,6 +39,9 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable { private Long predictedMillisUntilBytesBackpressure = 0L; private Integer predictedCountAtNextInterval = 0; private Long predictedBytesAtNextInterval = 0L; + private Integer predictedPercentCount = 0; + private Integer predictedPercentBytes = 0; + private Long predictionIntervalMillis = 0L; /* getters / setters */ /** @@ -161,6 +164,33 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable { this.predictedBytesAtNextInterval = predictedBytesAtNextInterval; } + @ApiModelProperty("The predicted percentage of queued objects at the next configured interval.") + public Integer getPredictedPercentCount() { + return predictedPercentCount; + } + + public void setPredictedPercentCount(Integer predictedPercentCount) { + this.predictedPercentCount = predictedPercentCount; + } + + @ApiModelProperty("The predicted percentage of bytes in the queue against current threshold at the next configured interval.") + public Integer getPredictedPercentBytes() { + return predictedPercentBytes; + } + + public void setPredictedPercentBytes(Integer predictedPercentBytes) { + this.predictedPercentBytes = predictedPercentBytes; + } + + @ApiModelProperty("The prediction interval in seconds") + public Long getPredictionIntervalMillis() { + return predictionIntervalMillis; + } + + public void setPredictionIntervalMillis(Long predictionIntervalMillis) { + this.predictionIntervalMillis = predictionIntervalMillis; + } + @Override public ConnectionStatisticsSnapshotDTO clone() { final ConnectionStatisticsSnapshotDTO other = new ConnectionStatisticsSnapshotDTO(); @@ -176,6 +206,9 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable { other.setPredictedMillisUntilBytesBackpressure(getPredictedMillisUntilBytesBackpressure()); other.setPredictedCountAtNextInterval(getPredictedCountAtNextInterval()); other.setPredictedBytesAtNextInterval(getPredictedBytesAtNextInterval()); + other.setPredictedPercentCount(getPredictedPercentCount()); + other.setPredictedPercentBytes(getPredictedPercentBytes()); + other.setPredictionIntervalMillis(getPredictionIntervalMillis()); return other; } diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/BivariateStatusAnalyticsModel.java similarity index 66% rename from nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/BivariateStatusAnalyticsModel.java index 9792ae4..eff661b 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/BivariateStatusAnalyticsModel.java @@ -16,17 +16,23 @@ */ package org.apache.nifi.controller.status.analytics; +import java.util.stream.Stream; + +public abstract class BivariateStatusAnalyticsModel implements StatusAnalyticsModel { + + + public abstract void learn(Stream<Double> features, Stream<Double> labels); + + public abstract Double predict(Double feature); + + public abstract Double predictX(Double y); + + public abstract Double predictY(Double x); + + @Override + public Boolean supportsOnlineLearning() { + return false; + } + -/** - * The ConnectionStatusAnalytics interface offers additional methods to the StatusAnalytics interface related to the supporting connection information (group ID, e.g.) - */ -public interface ConnectionStatusAnalytics extends StatusAnalytics{ - - String getGroupId(); - String getId(); - String getName(); - String getSourceId(); - String getSourceName(); - String getDestinationId(); - String getDestinationName(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java index c12dbae..c4836c6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java @@ -16,19 +16,10 @@ */ package org.apache.nifi.controller.status.analytics; -import java.util.Date; -import java.util.List; import java.util.concurrent.TimeUnit; -import org.apache.commons.math3.stat.regression.SimpleRegression; -import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.status.history.ComponentStatusRepository; -import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; -import org.apache.nifi.controller.status.history.StatusHistoryUtil; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; -import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,141 +29,33 @@ import com.github.benmanes.caffeine.cache.Caffeine; public class CachingConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine { private ComponentStatusRepository statusRepository; private FlowController controller; - private volatile Cache<String, SimpleRegression> cache; - private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalyticsEngine.class); + private volatile Cache<String, ConnectionStatusAnalytics> cache; + private static final Logger LOG = LoggerFactory.getLogger(CachingConnectionStatusAnalyticsEngine.class); public CachingConnectionStatusAnalyticsEngine(FlowController controller, ComponentStatusRepository statusRepository) { this.controller = controller; this.statusRepository = statusRepository; this.cache = Caffeine.newBuilder() - .expireAfterWrite(1, TimeUnit.MINUTES) + .expireAfterWrite(5, TimeUnit.MINUTES) .build(); } @Override - public ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId) { - - ProcessGroup rootGroup = controller.getFlowManager().getRootGroup(); - Connection connection = rootGroup.findConnection(connectionId); - SimpleRegression cachedRegression = cache.getIfPresent(connection.getIdentifier()); - - if (cachedRegression == null) { - cachedRegression = getBackPressureRegressionModel(connection); - if (cachedRegression != null) - cache.put(connection.getIdentifier(), cachedRegression); + public StatusAnalytics getStatusAnalytics(String identifier) { + + ConnectionStatusAnalytics connectionStatusAnalytics = cache.getIfPresent(identifier); + if(connectionStatusAnalytics == null){ + LOG.info("Creating new analytics for connection id: {0}", identifier); + connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,controller,identifier); + connectionStatusAnalytics.init(); + cache.put(identifier,connectionStatusAnalytics); + }else{ + LOG.info("Pulled existing analytics from cache for connection id: {}", identifier); + connectionStatusAnalytics.refresh(); } - - ConnectionStatusAnalytics cachedResult = calculate(cachedRegression, connection); - LOG.info("Connection: " + connectionId + " Cached backpressure Time: " + cachedResult.getTimeToCountBackpressureMillis()); - return cachedResult; - } - - protected ConnectionStatusAnalytics calculate(SimpleRegression regression, Connection conn) { - long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold(); - - final long connTimeToBackpressure; - - if (regression == null) { - connTimeToBackpressure = Long.MAX_VALUE; - } else { - //If calculation returns as negative only 0 will return - connTimeToBackpressure = Math.max(0, Math.round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope()) - - System.currentTimeMillis()); - } - - return new ConnectionStatusAnalytics() { - - @Override - public String getSourceName() { - return conn.getSource().getName(); - } - - @Override - public String getSourceId() { - return conn.getSource().getIdentifier(); - } - - @Override - public String getName() { - return conn.getName(); - } - - - @Override - public String getId() { - return conn.getIdentifier(); - } - - @Override - public long getTimeToBytesBackpressureMillis() { - return 0; - } - - @Override - public long getTimeToCountBackpressureMillis() { - return connTimeToBackpressure; - } - - @Override - public long getNextIntervalBytes() { - return 0; - } - - @Override - public int getNextIntervalCount() { - return 0; - } - - @Override - public String getGroupId() { - return conn.getProcessGroup().getIdentifier(); - } - - @Override - public String getDestinationName() { - return conn.getDestination().getName(); - } - - @Override - public String getDestinationId() { - return conn.getDestination().getIdentifier(); - } - }; + return connectionStatusAnalytics; } - /** - * Get backpressure model based on current data - * - * @param conn the connection to run the analytic on - * @return - */ - protected SimpleRegression getBackPressureRegressionModel(Connection conn) { - Date minDate = new Date(System.currentTimeMillis() - (60 * 1000)); - StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO( - statusRepository.getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE)); - List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots(); - - if (aggregateSnapshots.size() < 2) { - LOG.info("Not enough data to model time to backpressure."); - return null; - } else { - - ConnectionStatusDescriptor.QUEUED_COUNT.getField(); - SimpleRegression regression = new SimpleRegression(); - for (StatusSnapshotDTO snap : aggregateSnapshots) { - Long snapQueuedCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()); - long snapTime = snap.getTimestamp().getTime(); - regression.addData(snapTime, snapQueuedCount); - } - - if (regression.getSlope() <= 0 && !conn.getFlowFileQueue().isFull()) { - LOG.info("Connection " + conn.getIdentifier() + " is not experiencing backpressure."); - return null; - } else { - return regression; - } - } - } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java new file mode 100644 index 0000000..bad2ff1 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.analytics; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.stream.Stream; + +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.status.history.ComponentStatusRepository; +import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; +import org.apache.nifi.controller.status.history.StatusHistory; +import org.apache.nifi.controller.status.history.StatusHistoryUtil; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.util.Tuple; +import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; +import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConnectionStatusAnalytics implements StatusAnalytics { + + private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalytics.class); + private Map<String, Tuple<StatusAnalyticsModel, ExtractFunction>> modelMap; + private QueryWindow queryWindow; + private final ComponentStatusRepository componentStatusRepository; + private final String connectionIdentifier; + private final FlowController flowController; + + public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowController flowController, String connectionIdentifier) { + this.componentStatusRepository = componentStatusRepository; + this.flowController = flowController; + this.connectionIdentifier = connectionIdentifier; + } + + public void init() { + + if (this.modelMap == null || this.modelMap.isEmpty()) { + Tuple<StatusAnalyticsModel, ExtractFunction> countModelFunction = new Tuple<>(new SimpleRegressionBSAM(), extract); + Tuple<StatusAnalyticsModel, ExtractFunction> byteModelFunction = new Tuple<>(new SimpleRegressionBSAM(), extract); + this.modelMap = new HashMap<>(); + //TODO: Should change keys used here + this.modelMap.put(ConnectionStatusDescriptor.QUEUED_COUNT.getField(), countModelFunction); + this.modelMap.put(ConnectionStatusDescriptor.QUEUED_BYTES.getField(), byteModelFunction); + this.queryWindow = new QueryWindow(System.currentTimeMillis() - (5 * 60 * 1000), System.currentTimeMillis()); + } + + refresh(); + } + + public void refresh() { + + modelMap.forEach((metric, modelFunction) -> { + + StatusAnalyticsModel model = modelFunction.getKey(); + ExtractFunction extract = modelFunction.getValue(); + StatusHistory statusHistory = componentStatusRepository.getConnectionStatusHistory(connectionIdentifier, queryWindow.getStartDateTime(), queryWindow.getEndDateTime(), Integer.MAX_VALUE); + Tuple<Stream<Double>, Stream<Double>> modelData = extract.extractMetric(metric, statusHistory); + LOG.info("Refreshing model for connection id: {} ", connectionIdentifier); + Stream<Double> times = modelData.getKey(); + Stream<Double> counts = modelData.getValue(); + //times is the X axis and counts is on the y axis + model.learn(times, counts); + + }); + } + + /** + * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue. + * + * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. + */ + public long getTimeToBytesBackpressureMillis() { + + final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey(); + final Connection connection = getConnection(); + if (connection == null) { + throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!"); + } + final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold(); + final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B); + final double prediction = bytesModel.predictX(backPressureBytes); + if (prediction != Double.NaN) { + return Math.max(0, Math.round(prediction) - System.currentTimeMillis()); + } else { + return Long.MAX_VALUE; + } + + } + + /** + * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue. + * + * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue. + */ + public long getTimeToCountBackpressureMillis() { + + final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey(); + final Connection connection = getConnection(); + if (connection == null) { + throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!"); + } + final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold(); + final Double prediction = countModel.predictX(backPressureCountThreshold); + + if (prediction != Double.NaN) { + return Math.max(0, Math.round(prediction) - System.currentTimeMillis()); + } else { + return Long.MAX_VALUE; + } + } + + /** + * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.). + * + * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. + */ + + public long getNextIntervalBytes() { + final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey(); + final Double prediction = bytesModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis()); + if (prediction != Double.NaN) { + return Math.round(prediction); + } else { + return 0; + } + } + + /** + * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.). + * + * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue. + */ + + public int getNextIntervalCount() { + final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey(); + final Double prediction = countModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis()); + if (prediction != Double.NaN) { + return ((Long) Math.round(prediction)).intValue(); + } else { + return 0; + } + } + + public int getNextIntervalPercentageUseCount(){ + + final Connection connection = getConnection(); + if (connection == null) { + throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!"); + } + final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold(); + + return ((Long)Math.round((getNextIntervalCount()/backPressureCountThreshold) * 100)).intValue(); + + } + + public int getNextIntervalPercentageUseBytes(){ + + final Connection connection = getConnection(); + if (connection == null) { + throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!"); + } + final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold(); + final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B); + + return ((Long)Math.round((getNextIntervalBytes()/ backPressureBytes) * 100)).intValue(); + + } + + public long getIntervalTimeMillis(){ + return getQueryWindow().getTimeDifferenceMillis(); + } + + @Override + public QueryWindow getQueryWindow() { + return queryWindow; + } + + /** + * Returns all available predictions + */ + @Override + public Map<String, Long> getPredictions() { + + Map<String, Long> predictions = new HashMap<>(); + predictions.put("timeToBytesBackpressureMillis", getTimeToBytesBackpressureMillis()); + predictions.put("timeToCountBackpressureMillis", getTimeToCountBackpressureMillis()); + predictions.put("nextIntervalBytes", getNextIntervalBytes()); + predictions.put("nextIntervalCount", (long) getNextIntervalCount()); + predictions.put("nextIntervalPercentageUseCount", (long)getNextIntervalPercentageUseCount()); + predictions.put("nextIntervalPercentageUseBytes", (long)getNextIntervalPercentageUseBytes()); + predictions.put("intervalTimeMillis", getIntervalTimeMillis()); + + predictions.forEach((key,value) -> { + LOG.info("Prediction model for connection id {}: {}={} ", connectionIdentifier,key,value); + }); + + return predictions; + } + + @Override + public boolean supportsOnlineLearning() { + return true; + } + + private Connection getConnection() { + final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup(); + Optional<Connection> connection = rootGroup.findAllConnections().stream().filter(c -> c.getIdentifier().equals(this.connectionIdentifier)).findFirst(); + return connection.orElse(null); + } + + private interface ExtractFunction { + Tuple<Stream<Double>, Stream<Double>> extractMetric(String metric, StatusHistory statusHistory); + } + + private final ExtractFunction extract = (metric, statusHistory) -> { + + List<Double> counts = new ArrayList<>(); + List<Double> times = new ArrayList<>(); + + StatusHistoryDTO statusHistoryDTO = StatusHistoryUtil.createStatusHistoryDTO(statusHistory); + + for (StatusSnapshotDTO snap : statusHistoryDTO.getAggregateSnapshots()) { + Long snapValue = snap.getStatusMetrics().get(metric); + long snapTime = snap.getTimestamp().getTime(); + counts.add((double) snapValue); + times.add((double) snapTime); + } + return new Tuple<>(times.stream(), counts.stream()); + + }; + + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java index 11862c8..7f9db25 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java @@ -16,18 +16,8 @@ */ package org.apache.nifi.controller.status.analytics; -import java.util.Date; -import java.util.List; - -import org.apache.commons.math3.stat.regression.SimpleRegression; -import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.status.history.ComponentStatusRepository; -import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; -import org.apache.nifi.controller.status.history.StatusHistoryUtil; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; -import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,114 +33,11 @@ public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine { } @Override - public ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId) { - ProcessGroup rootGroup = controller.getFlowManager().getRootGroup(); - return getConnectionStatusAnalytics(rootGroup.findConnection(connectionId)); + public StatusAnalytics getStatusAnalytics(String identifier) { + ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,controller,identifier); + connectionStatusAnalytics.init(); + return connectionStatusAnalytics; } - /** - * Finds the number of millis until the given connection will experience backpressure. - * @param conn the connection to run the analytic on - * @return - */ - public ConnectionStatusAnalytics getConnectionStatusAnalytics(Connection conn) { - LOG.debug("Getting connection history for: " + conn.getIdentifier()); - long connTimeToBackpressure; - Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000)); - StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO( - statusRepository.getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE)); - List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots(); - - if (aggregateSnapshots.size() < 2) { - LOG.info("Not enough data to model time to backpressure."); - connTimeToBackpressure = Long.MAX_VALUE; - } else { - - long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold(); - LOG.info("Connection " + conn.getIdentifier() + " backpressure object threshold is " + backPressureObjectThreshold); - - ConnectionStatusDescriptor.QUEUED_COUNT.getField(); - - SimpleRegression regression = new SimpleRegression(); - - for (StatusSnapshotDTO snap : aggregateSnapshots) { - Long snapQueuedCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()); - long snapTime = snap.getTimestamp().getTime(); - regression.addData(snapTime, snapQueuedCount); - } - - // Skip this connection if its queue is declining. - if (regression.getSlope() <= 0) { - LOG.info("Connection " + conn.getIdentifier() + " is not experiencing backpressure."); - connTimeToBackpressure = Long.MAX_VALUE; - } else { - - // Compute time-to backpressure for this connection; Reduce total result iff - // this connection is lower. - connTimeToBackpressure = Math - .round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope()) - - System.currentTimeMillis(); - LOG.info("Connection " + conn.getIdentifier() + " time to backpressure is " + connTimeToBackpressure); - } - } - - return new ConnectionStatusAnalytics() { - - @Override - public String getSourceName() { - return conn.getSource().getName(); - } - @Override - public String getSourceId() { - return conn.getSource().getIdentifier(); - } - - @Override - public String getName() { - return conn.getName(); - } - - @Override - public long getTimeToCountBackpressureMillis() { - return connTimeToBackpressure; - } - - // TODO - populate the other prediction fields - @Override - public long getTimeToBytesBackpressureMillis() { - return 0; - } - - @Override - public long getNextIntervalBytes() { - return 0; - } - - @Override - public int getNextIntervalCount() { - return 0; - } - - @Override - public String getId() { - return conn.getIdentifier(); - } - - @Override - public String getGroupId() { - return conn.getProcessGroup().getIdentifier(); - } - - @Override - public String getDestinationName() { - return conn.getDestination().getName(); - } - - @Override - public String getDestinationId() { - return conn.getDestination().getIdentifier(); - } - }; - } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java new file mode 100644 index 0000000..8aa4a45 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.analytics; + + +import java.util.stream.Stream; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.math3.stat.regression.SimpleRegression; + +public class SimpleRegressionBSAM extends BivariateStatusAnalyticsModel { + + private SimpleRegression regression; + + public SimpleRegressionBSAM() { + this.regression = new SimpleRegression(); + } + + @Override + public void learn(Stream<Double> features, Stream<Double> labels) { + double[] labelArray = ArrayUtils.toPrimitive(labels.toArray(Double[]::new)); + double[][] featuresMatrix = features.map(feature -> new double[]{feature}).toArray(double[][]::new); + regression.clear(); + regression.addObservations(featuresMatrix, labelArray); + } + + @Override + public Double predict(Double feature) { + return predictY(feature); + } + + @Override + public Double predictX(Double y) { + return (y - regression.getIntercept()) / regression.getSlope(); + } + + @Override + public Double predictY(Double x) { + return regression.getSlope() * x + regression.getIntercept(); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java index 33e650b..1e36975 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -343,12 +344,16 @@ public class StandardEventAccess implements UserAwareEventAccess { } if (statusAnalyticsEngine != null) { - StatusAnalytics statusAnalytics = statusAnalyticsEngine.getConnectionStatusAnalytics(conn.getIdentifier()); + StatusAnalytics statusAnalytics = statusAnalyticsEngine.getStatusAnalytics(conn.getIdentifier()); if (statusAnalytics != null) { - connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis()); - connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis()); - connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes()); - connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount()); + Map<String,Long> predictions = statusAnalytics.getPredictions(); + connStatus.setPredictedTimeToBytesBackpressureMillis(predictions.get("timeToBytesBackpressureMillis")); + connStatus.setPredictedTimeToCountBackpressureMillis(predictions.get("timeToCountBackpressureMillis")); + connStatus.setNextPredictedQueuedBytes(predictions.get("nextIntervalBytes")); + connStatus.setNextPredictedQueuedCount(predictions.get("nextIntervalCount").intValue()); + connStatus.setPredictedPercentCount(predictions.get("nextIntervalPercentageUseCount").intValue()); + connStatus.setPredictedPercentBytes(predictions.get("nextIntervalPercentageUseBytes").intValue()); + connStatus.setPredictionIntervalMillis(predictions.get("intervalTimeMillis")); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 35f27ec..5e53fda 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,7 +16,35 @@ */ package org.apache.nifi.web; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; + import org.apache.commons.collections4.CollectionUtils; import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; @@ -310,33 +338,7 @@ import org.apache.nifi.web.util.SnippetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import com.google.common.collect.Sets; /** * Implementation of NiFiServiceFacade that performs revision checking. @@ -3198,7 +3200,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { public ConnectionStatisticsEntity getConnectionStatistics(final String connectionId) { final Connection connection = connectionDAO.getConnection(connectionId); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); - final ConnectionStatisticsDTO dto = dtoFactory.createConnectionStatisticsDto(controllerFacade.getConnectionStatistics(connectionId)); + final ConnectionStatisticsDTO dto = dtoFactory.createConnectionStatisticsDto(connection, controllerFacade.getConnectionStatusAnalytics(connectionId)); return entityFactory.createConnectionStatisticsEntity(dto, permissions); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 18781d9..34c49c0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -16,6 +16,34 @@ */ package org.apache.nifi.web.api.dto; +import java.text.Collator; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TimeZone; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; + import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; @@ -106,7 +134,7 @@ import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; -import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics; +import org.apache.nifi.controller.status.analytics.StatusAnalytics; import org.apache.nifi.controller.status.history.GarbageCollectionHistory; import org.apache.nifi.controller.status.history.GarbageCollectionStatus; import org.apache.nifi.diagnostics.GarbageCollection; @@ -236,33 +264,6 @@ import org.apache.nifi.web.api.entity.VariableEntity; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.revision.RevisionManager; -import javax.ws.rs.WebApplicationException; -import java.text.Collator; -import java.text.NumberFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TimeZone; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - public final class DtoFactory { @SuppressWarnings("rawtypes") @@ -1199,30 +1200,35 @@ public final class DtoFactory { return connectionStatusDto; } - public ConnectionStatisticsDTO createConnectionStatisticsDto(final ConnectionStatusAnalytics connectionStatistics) { + public ConnectionStatisticsDTO createConnectionStatisticsDto(final Connection connection, final StatusAnalytics statusAnalytics) { final ConnectionStatisticsDTO connectionStatisticsDTO = new ConnectionStatisticsDTO(); - connectionStatisticsDTO.setGroupId(connectionStatistics.getGroupId()); - connectionStatisticsDTO.setId(connectionStatistics.getId()); - connectionStatisticsDTO.setName(connectionStatistics.getName()); - connectionStatisticsDTO.setSourceId(connectionStatistics.getSourceId()); - connectionStatisticsDTO.setSourceName(connectionStatistics.getSourceName()); - connectionStatisticsDTO.setDestinationId(connectionStatistics.getDestinationId()); - connectionStatisticsDTO.setDestinationName(connectionStatistics.getDestinationName()); + + connectionStatisticsDTO.setGroupId(connection.getProcessGroup().getIdentifier()); + connectionStatisticsDTO.setId(connection.getIdentifier()); + connectionStatisticsDTO.setName(connection.getName()); + connectionStatisticsDTO.setSourceId(connection.getSource().getIdentifier()); + connectionStatisticsDTO.setSourceName(connection.getSource().getName()); + connectionStatisticsDTO.setDestinationId(connection.getDestination().getIdentifier()); + connectionStatisticsDTO.setDestinationName(connection.getDestination().getName()); connectionStatisticsDTO.setStatsLastRefreshed(new Date()); final ConnectionStatisticsSnapshotDTO snapshot = new ConnectionStatisticsSnapshotDTO(); connectionStatisticsDTO.setAggregateSnapshot(snapshot); - snapshot.setId(connectionStatistics.getId()); - snapshot.setGroupId(connectionStatistics.getGroupId()); - snapshot.setName(connectionStatistics.getName()); - snapshot.setSourceName(connectionStatistics.getSourceName()); - snapshot.setDestinationName(connectionStatistics.getDestinationName()); - - snapshot.setPredictedMillisUntilBytesBackpressure(connectionStatistics.getTimeToBytesBackpressureMillis()); - snapshot.setPredictedMillisUntilCountBackpressure(connectionStatistics.getTimeToCountBackpressureMillis()); - snapshot.setPredictedBytesAtNextInterval(connectionStatistics.getNextIntervalBytes()); - snapshot.setPredictedCountAtNextInterval(connectionStatistics.getNextIntervalCount()); + snapshot.setId(connection.getIdentifier()); + snapshot.setGroupId(connection.getProcessGroup().getIdentifier()); + snapshot.setName(connection.getName()); + snapshot.setSourceName(connection.getSource().getName()); + snapshot.setDestinationName(connection.getDestination().getName()); + + Map<String,Long> predictions = statusAnalytics.getPredictions(); + snapshot.setPredictedMillisUntilBytesBackpressure(predictions.get("timeToBytesBackpressureMillis")); + snapshot.setPredictedMillisUntilCountBackpressure(predictions.get("timeToCountBackpressureMillis")); + snapshot.setPredictedBytesAtNextInterval(predictions.get("nextIntervalBytes")); + snapshot.setPredictedCountAtNextInterval(predictions.get("nextIntervalCount").intValue()); + snapshot.setPredictedPercentBytes(predictions.get("nextIntervalPercentageUseBytes").intValue()); + snapshot.setPredictedPercentCount(predictions.get("nextIntervalPercentageUseCount").intValue()); + snapshot.setPredictionIntervalMillis(predictions.get("intervalTimeMillis")); return connectionStatisticsDTO; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 95f0713..d7714da 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -16,6 +16,30 @@ */ package org.apache.nifi.web.controller; +import java.io.IOException; +import java.io.InputStream; +import java.text.Collator; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TimeZone; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; + import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; @@ -56,7 +80,7 @@ import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; -import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics; +import org.apache.nifi.controller.status.analytics.StatusAnalytics; import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.diagnostics.SystemDiagnostics; @@ -112,29 +136,6 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.WebApplicationException; -import java.io.IOException; -import java.io.InputStream; -import java.text.Collator; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TimeZone; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.stream.Collectors; - public class ControllerFacade implements Authorizable { private static final Logger logger = LoggerFactory.getLogger(ControllerFacade.class); @@ -687,12 +688,12 @@ public class ControllerFacade implements Authorizable { } /** - * Gets analytical statistics for the specified connection. + * Gets status analytics for the specified connection. * * @param connectionId connection id * @return the statistics for the specified connection */ - public ConnectionStatusAnalytics getConnectionStatistics(final String connectionId) { + public StatusAnalytics getConnectionStatusAnalytics(final String connectionId) { final ProcessGroup root = getRootGroup(); final Connection connection = root.findConnection(connectionId); @@ -709,12 +710,12 @@ public class ControllerFacade implements Authorizable { } // get from flow controller - final StatusAnalyticsEngine status = flowController.getStatusAnalyticsEngine(); - if (status == null) { + final StatusAnalyticsEngine statusAnalyticsEngine = flowController.getStatusAnalyticsEngine(); + if (statusAnalyticsEngine == null) { throw new ResourceNotFoundException(String.format("Unable to locate connection with id '%s'.", connectionId)); } - return status.getConnectionStatusAnalytics(connectionId); + return statusAnalyticsEngine.getStatusAnalytics(connectionId); } /**
