This is an automated email from the ASF dual-hosted git repository. aichrist pushed a commit to branch analytics-framework in repository https://gitbox.apache.org/repos/asf/nifi.git
commit b90430abfef092b6d223dc378723155aaeab750e Author: Matthew Burgess <[email protected]> AuthorDate: Mon Jul 22 18:11:58 2019 -0400 NIFI-6510 Updated objects and interfaces to reflect 4 prediction metrics (cherry picked from commit 050e0fc) (cherry picked from commit 9fd365f) --- .../nifi/controller/status/ConnectionStatus.java | 48 ++++++++++++++ .../analytics/ConnectionStatusAnalytics.java | 26 +++++++- .../status/analytics/StatusAnalytics.java | 30 ++++++++- .../status/ConnectionStatisticsSnapshotDTO.java | 47 ++++++++++++-- .../dto/status/ConnectionStatusSnapshotDTO.java | 44 +++++++++++++ .../entity/ConnectionStatisticsSnapshotEntity.java | 1 - .../org/apache/nifi/controller/FlowController.java | 75 +++++++++------------- .../status/analytics/StatusAnalyticEngine.java | 42 +++++++++++- .../apache/nifi/reporting/StandardEventAccess.java | 9 +++ .../org/apache/nifi/web/api/dto/DtoFactory.java | 9 ++- 10 files changed, 274 insertions(+), 57 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java index 341fda2..0e5d306 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java @@ -40,6 +40,10 @@ public class ConnectionStatus implements Cloneable { private long outputBytes; private int maxQueuedCount; private long maxQueuedBytes; + private int nextPredictedQueuedCount; + private long nextPredictedQueuedBytes; + private long predictedTimeToCountBackpressureMillis; + private long predictedTimeToBytesBackpressureMillis; public String getId() { return id; @@ -186,6 +190,38 @@ public class ConnectionStatus implements Cloneable { this.backPressureBytesThreshold = backPressureBytesThreshold; } + public int getNextPredictedQueuedCount() { + return nextPredictedQueuedCount; + } + + public void setNextPredictedQueuedCount(int nextPredictedQueuedCount) { + this.nextPredictedQueuedCount = nextPredictedQueuedCount; + } + + public long getNextPredictedQueuedBytes() { + return nextPredictedQueuedBytes; + } + + public void setNextPredictedQueuedBytes(long nextPredictedQueuedBytes) { + this.nextPredictedQueuedBytes = nextPredictedQueuedBytes; + } + + public long getPredictedTimeToCountBackpressureMillis() { + return predictedTimeToCountBackpressureMillis; + } + + public void setPredictedTimeToCountBackpressureMillis(long predictedTimeToCountBackpressureMillis) { + this.predictedTimeToCountBackpressureMillis = predictedTimeToCountBackpressureMillis; + } + + public long getPredictedTimeToBytesBackpressureMillis() { + return predictedTimeToBytesBackpressureMillis; + } + + public void setPredictedTimeToBytesBackpressureMillis(long predictedTimeToBytesBackpressureMillis) { + this.predictedTimeToBytesBackpressureMillis = predictedTimeToBytesBackpressureMillis; + } + @Override public ConnectionStatus clone() { final ConnectionStatus clonedObj = new ConnectionStatus(); @@ -206,6 +242,10 @@ public class ConnectionStatus implements Cloneable { clonedObj.backPressureObjectThreshold = backPressureObjectThreshold; clonedObj.maxQueuedBytes = maxQueuedBytes; clonedObj.maxQueuedCount = maxQueuedCount; + clonedObj.nextPredictedQueuedBytes = nextPredictedQueuedBytes; + clonedObj.nextPredictedQueuedCount = nextPredictedQueuedCount; + clonedObj.predictedTimeToBytesBackpressureMillis = predictedTimeToBytesBackpressureMillis; + clonedObj.predictedTimeToCountBackpressureMillis = predictedTimeToCountBackpressureMillis; return clonedObj; } @@ -246,6 +286,14 @@ public class ConnectionStatus implements Cloneable { builder.append(maxQueuedCount); builder.append(", maxQueueBytes="); builder.append(maxQueuedBytes); + builder.append(", nextPredictedQueuedBytes="); + builder.append(nextPredictedQueuedBytes); + builder.append(", nextPredictedQueuedCount="); + builder.append(nextPredictedQueuedCount); + builder.append(", predictedTimeToBytesBackpressureMillis="); + builder.append(predictedTimeToBytesBackpressureMillis); + builder.append(", predictedTimeToCountBackpressureMillis="); + builder.append(predictedTimeToCountBackpressureMillis); builder.append("]"); return builder.toString(); } diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java index 12c8a15..2380d55 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java @@ -17,7 +17,31 @@ package org.apache.nifi.controller.status.analytics; public interface ConnectionStatusAnalytics { - long getMinTimeToBackpressureMillis(); + + /** + * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue. + * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. + */ + long getTimeToBytesBackpressureMillis(); + + /** + * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue. + * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue. + */ + long getTimeToCountBackpressureMillis(); + + /** + * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.). + * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. + */ + long getNextIntervalBytes(); + + /** + * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.). + * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue. + */ + int getNextIntervalCount(); + String getGroupId(); String getId(); String getName(); diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java index 42c2abd..7d29314 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java @@ -21,7 +21,35 @@ package org.apache.nifi.controller.status.analytics; */ public interface StatusAnalytics { + /** + * Returns a ConnectionStatusAnalytics object containing all relevant metrics and analytical & statistical objects, as well as identity information for the connection. + * + * @param connectionId The unique ID of the connection + * @return A ConnectionStatusAnalytics object + */ ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId); - public long getMinTimeToBackpressureMillis(); + /** + * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue. + * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. + */ + long getTimeToBytesBackpressureMillis(); + + /** + * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue. + * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue. + */ + long getTimeToCountBackpressureMillis(); + + /** + * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.). + * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. + */ + long getNextIntervalBytes(); + + /** + * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.). + * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue. + */ + int getNextIntervalCount(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java index e914f74..526bdcf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java @@ -35,7 +35,10 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable { private String destinationId; private String destinationName; - private Long predictedMillisUntilBackpressure = 0L; + private Long predictedMillisUntilCountBackpressure = 0L; + private Long predictedMillisUntilBytesBackpressure = 0L; + private Integer predictedCountAtNextInterval = 0; + private Long predictedBytesAtNextInterval = 0L; /* getters / setters */ /** @@ -122,13 +125,40 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable { this.destinationName = destinationName; } - @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied.") - public Long getPredictedMillisUntilBackpressure() { - return predictedMillisUntilBackpressure; + @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied, based on the queued count.") + public Long getPredictedMillisUntilCountBackpressure() { + return predictedMillisUntilCountBackpressure; } - public void setPredictedMillisUntilBackpressure(Long predictedMillisUntilBackpressure) { - this.predictedMillisUntilBackpressure = predictedMillisUntilBackpressure; + public void setPredictedMillisUntilCountBackpressure(Long predictedMillisUntilCountBackpressure) { + this.predictedMillisUntilCountBackpressure = predictedMillisUntilCountBackpressure; + } + + @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied, based on the total number of bytes in the queue.") + public Long getPredictedMillisUntilBytesBackpressure() { + return predictedMillisUntilBytesBackpressure; + } + + public void setPredictedMillisUntilBytesBackpressure(Long predictedMillisUntilBytesBackpressure) { + this.predictedMillisUntilBytesBackpressure = predictedMillisUntilBytesBackpressure; + } + + @ApiModelProperty("The predicted number of queued objects at the next configured interval.") + public Integer getPredictedCountAtNextInterval() { + return predictedCountAtNextInterval; + } + + public void setPredictedCountAtNextInterval(Integer predictedCountAtNextInterval) { + this.predictedCountAtNextInterval = predictedCountAtNextInterval; + } + + @ApiModelProperty("The predicted total number of bytes in the queue at the next configured interval.") + public Long getPredictedBytesAtNextInterval() { + return predictedBytesAtNextInterval; + } + + public void setPredictedBytesAtNextInterval(Long predictedBytesAtNextInterval) { + this.predictedBytesAtNextInterval = predictedBytesAtNextInterval; } @Override @@ -142,7 +172,10 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable { other.setSourceId(getSourceId()); other.setSourceName(getSourceName()); - other.setPredictedMillisUntilBackpressure(getPredictedMillisUntilBackpressure()); + other.setPredictedMillisUntilCountBackpressure(getPredictedMillisUntilCountBackpressure()); + other.setPredictedMillisUntilBytesBackpressure(getPredictedMillisUntilBytesBackpressure()); + other.setPredictedCountAtNextInterval(getPredictedCountAtNextInterval()); + other.setPredictedBytesAtNextInterval(getPredictedBytesAtNextInterval()); return other; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java index 7ba93cc..3237385 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java @@ -48,6 +48,10 @@ public class ConnectionStatusSnapshotDTO implements Cloneable { private String queuedCount; private Integer percentUseCount; private Integer percentUseBytes; + private Long predictedMillisUntilCountBackpressure = 0L; + private Long predictedMillisUntilBytesBackpressure = 0L; + private Integer predictedCountAtNextInterval = 0; + private Long predictedBytesAtNextInterval = 0L; /* getters / setters */ /** @@ -271,6 +275,42 @@ public class ConnectionStatusSnapshotDTO implements Cloneable { this.percentUseBytes = percentUseBytes; } + @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied, based on the queued count.") + public Long getPredictedMillisUntilCountBackpressure() { + return predictedMillisUntilCountBackpressure; + } + + public void setPredictedMillisUntilCountBackpressure(Long predictedMillisUntilCountBackpressure) { + this.predictedMillisUntilCountBackpressure = predictedMillisUntilCountBackpressure; + } + + @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied, based on the total number of bytes in the queue.") + public Long getPredictedMillisUntilBytesBackpressure() { + return predictedMillisUntilBytesBackpressure; + } + + public void setPredictedMillisUntilBytesBackpressure(Long predictedMillisUntilBytesBackpressure) { + this.predictedMillisUntilBytesBackpressure = predictedMillisUntilBytesBackpressure; + } + + @ApiModelProperty("The predicted number of queued objects at the next configured interval.") + public Integer getPredictedCountAtNextInterval() { + return predictedCountAtNextInterval; + } + + public void setPredictedCountAtNextInterval(Integer predictedCountAtNextInterval) { + this.predictedCountAtNextInterval = predictedCountAtNextInterval; + } + + @ApiModelProperty("The predicted total number of bytes in the queue at the next configured interval.") + public Long getPredictedBytesAtNextInterval() { + return predictedBytesAtNextInterval; + } + + public void setPredictedBytesAtNextInterval(Long predictedBytesAtNextInterval) { + this.predictedBytesAtNextInterval = predictedBytesAtNextInterval; + } + @Override public ConnectionStatusSnapshotDTO clone() { final ConnectionStatusSnapshotDTO other = new ConnectionStatusSnapshotDTO(); @@ -295,6 +335,10 @@ public class ConnectionStatusSnapshotDTO implements Cloneable { other.setQueuedSize(getQueuedSize()); other.setPercentUseBytes(getPercentUseBytes()); other.setPercentUseCount(getPercentUseCount()); + other.setPredictedMillisUntilCountBackpressure(getPredictedMillisUntilCountBackpressure()); + other.setPredictedMillisUntilBytesBackpressure(getPredictedMillisUntilBytesBackpressure()); + other.setPredictedCountAtNextInterval(getPredictedCountAtNextInterval()); + other.setPredictedBytesAtNextInterval(getPredictedBytesAtNextInterval()); return other; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java index da7e5ca..6f4eee5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java @@ -19,7 +19,6 @@ package org.apache.nifi.web.api.entity; import io.swagger.annotations.ApiModelProperty; import org.apache.nifi.web.api.dto.ReadablePermission; import org.apache.nifi.web.api.dto.status.ConnectionStatisticsSnapshotDTO; -import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO; /** * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 462b113..5f67b49 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -16,39 +16,6 @@ */ package org.apache.nifi.controller; -import static java.util.Objects.requireNonNull; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.management.GarbageCollectorMXBean; -import java.lang.management.ManagementFactory; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -import javax.net.ssl.SSLContext; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; @@ -217,6 +184,38 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLContext; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + public class FlowController implements ReportingTaskProvider, Authorizable, NodeTypeProvider { // default repository implementations @@ -608,16 +607,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node analyticsEngine = new CachingStatusAnalyticEngine(this, componentStatusRepository); - timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - try { - analyticsEngine.refreshModel(); - } catch (final Exception e) { - LOG.error("Failed to refresh model:", e); - } - } - }, 1, 1, TimeUnit.MINUTES); //FIXME use a real/configured interval (or maybe just compute on the fly when requested) this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED); heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java index 64c2065..024c138 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java @@ -113,10 +113,26 @@ public class StatusAnalyticEngine implements StatusAnalytics { } @Override - public long getMinTimeToBackpressureMillis() { + public long getTimeToCountBackpressureMillis() { return connTimeToBackpressure; } + // TODO - populate the other prediction fields + @Override + public long getTimeToBytesBackpressureMillis() { + return 0; + } + + @Override + public long getNextIntervalBytes() { + return 0; + } + + @Override + public int getNextIntervalCount() { + return 0; + } + @Override public String getId() { return conn.getIdentifier(); @@ -139,7 +155,6 @@ public class StatusAnalyticEngine implements StatusAnalytics { }; } - @Override public long getMinTimeToBackpressureMillis() { ProcessGroup rootGroup = controller.getFlowManager().getRootGroup(); List<Connection> allConnections = rootGroup.findAllConnections(); @@ -148,10 +163,31 @@ public class StatusAnalyticEngine implements StatusAnalytics { for (Connection conn : allConnections) { ConnectionStatusAnalytics connAnalytics = getConnectionStatusAnalytics(conn); - minTimeToBackpressure = Math.min(minTimeToBackpressure, connAnalytics.getMinTimeToBackpressureMillis()); + minTimeToBackpressure = Math.min(minTimeToBackpressure, connAnalytics.getTimeToCountBackpressureMillis()); } LOG.info("Min time to backpressure is: " + Long.toString(minTimeToBackpressure)); return minTimeToBackpressure; } + + // TODO - populate the prediction fields. Do we need to pass in connection ID? + @Override + public long getTimeToCountBackpressureMillis() { + return 0; + } + + @Override + public long getTimeToBytesBackpressureMillis() { + return 0; + } + + @Override + public long getNextIntervalBytes() { + return 0; + } + + @Override + public int getNextIntervalCount() { + return 0; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java index 0b4b73c..f943856 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java @@ -51,6 +51,7 @@ import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.controller.status.RunStatus; import org.apache.nifi.controller.status.TransmissionStatus; +import org.apache.nifi.controller.status.analytics.StatusAnalytics; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.history.History; @@ -338,6 +339,14 @@ public class StandardEventAccess implements UserAwareEventAccess { bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut(); } + final StatusAnalytics statusAnalytics = flowController.getStatusAnalytics(); + if (statusAnalytics != null) { + connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis()); + connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis()); + connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes()); + connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount()); + } + if (isConnectionAuthorized) { if (StringUtils.isNotBlank(conn.getName())) { connStatus.setName(conn.getName()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index b8fed37..2ff12e6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -1179,9 +1179,13 @@ public final class DtoFactory { if (connectionStatus.getBackPressureObjectThreshold() > 0) { snapshot.setPercentUseCount(Math.min(100, StatusMerger.getUtilization(connectionStatus.getQueuedCount(), connectionStatus.getBackPressureObjectThreshold()))); + snapshot.setPredictedMillisUntilCountBackpressure(connectionStatus.getPredictedTimeToCountBackpressureMillis()); + snapshot.setPredictedCountAtNextInterval(connectionStatus.getNextPredictedQueuedCount()); } if (connectionStatus.getBackPressureBytesThreshold() > 0) { snapshot.setPercentUseBytes(Math.min(100, StatusMerger.getUtilization(connectionStatus.getQueuedBytes(), connectionStatus.getBackPressureBytesThreshold()))); + snapshot.setPredictedMillisUntilBytesBackpressure(connectionStatus.getPredictedTimeToBytesBackpressureMillis()); + snapshot.setPredictedBytesAtNextInterval(connectionStatus.getNextPredictedQueuedBytes()); } StatusMerger.updatePrettyPrintedFields(snapshot); @@ -1209,7 +1213,10 @@ public final class DtoFactory { snapshot.setSourceName(connectionStatistics.getSourceName()); snapshot.setDestinationName(connectionStatistics.getDestinationName()); - snapshot.setPredictedMillisUntilBackpressure(connectionStatistics.getMinTimeToBackpressureMillis()); + snapshot.setPredictedMillisUntilBytesBackpressure(connectionStatistics.getTimeToBytesBackpressureMillis()); + snapshot.setPredictedMillisUntilCountBackpressure(connectionStatistics.getTimeToCountBackpressureMillis()); + snapshot.setPredictedBytesAtNextInterval(connectionStatistics.getNextIntervalBytes()); + snapshot.setPredictedCountAtNextInterval(connectionStatistics.getNextIntervalCount()); return connectionStatisticsDTO; }
