This is an automated email from the ASF dual-hosted git repository. aichrist pushed a commit to branch analytics-framework in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 0d078358fa08891c6113302acb0376877a6726db Author: Yolanda Davis <[email protected]> AuthorDate: Tue Jul 23 01:38:06 2019 -0400 NIFI-6510 adjustments for interface updates, added call to StandardEventAccess, updated interface to use connection id (cherry picked from commit 14854ff) DFA-9 - reduced snapshot interval to 1 minute (cherry picked from commit 36abb0a) --- .../status/analytics/StatusAnalytics.java | 12 ++-- .../org/apache/nifi/controller/FlowController.java | 74 +++++++++++----------- .../analytics/CachingStatusAnalyticEngine.java | 70 +++++++++++++------- .../status/analytics/StatusAnalyticEngine.java | 8 +-- .../apache/nifi/reporting/StandardEventAccess.java | 11 ++-- 5 files changed, 102 insertions(+), 73 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java index 7d29314..45e1c12 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java @@ -32,24 +32,28 @@ public interface StatusAnalytics { /** * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue. * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. + * @param connectionId */ - long getTimeToBytesBackpressureMillis(); + long getTimeToBytesBackpressureMillis(String connectionId); /** * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue. * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue. + * @param connectionId */ - long getTimeToCountBackpressureMillis(); + long getTimeToCountBackpressureMillis(String connectionId); /** * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.). * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. + * @param connectionId */ - long getNextIntervalBytes(); + long getNextIntervalBytes(String connectionId); /** * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.). * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue. + * @param connectionId */ - int getNextIntervalCount(); + int getNextIntervalCount(String connectionId); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 5f67b49..450d944 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -16,6 +16,39 @@ */ package org.apache.nifi.controller; +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import javax.net.ssl.SSLContext; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; @@ -184,38 +217,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.management.GarbageCollectorMXBean; -import java.lang.management.ManagementFactory; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -import static java.util.Objects.requireNonNull; - public class FlowController implements ReportingTaskProvider, Authorizable, NodeTypeProvider { // default repository implementations @@ -350,7 +351,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node // guarded by rwLock private NodeConnectionStatus connectionStatus; - private CachingStatusAnalyticEngine analyticsEngine; + private StatusAnalytics analyticsEngine; // guarded by rwLock private String instanceId; @@ -592,8 +593,10 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node zooKeeperStateServer = null; } - eventAccess = new StandardEventAccess(this, flowFileEventRepository); componentStatusRepository = createComponentStatusRepository(); + analyticsEngine = new CachingStatusAnalyticEngine(this, componentStatusRepository); + eventAccess = new StandardEventAccess(this, flowFileEventRepository); + timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { @Override public void run() { @@ -605,9 +608,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node } }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS); - analyticsEngine = new CachingStatusAnalyticEngine(this, componentStatusRepository); - - this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED); heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java index 5241c4a..864a5d4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java @@ -2,6 +2,7 @@ package org.apache.nifi.controller.status.analytics; import java.util.Date; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.commons.math3.stat.regression.SimpleRegression; import org.apache.nifi.connectable.Connection; @@ -28,6 +29,7 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics { this.controller = controller; this.statusRepository = statusRepository; this.cache = Caffeine.newBuilder() + .expireAfterWrite(1,TimeUnit.MINUTES) .build(); } @@ -38,15 +40,37 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics { Connection connection = rootGroup.findConnection(connectionId); SimpleRegression cachedRegression = cache.getIfPresent(connection.getIdentifier()); - if(cachedRegression != null) { - cache.put(connection.getIdentifier(), cachedRegression); + if(cachedRegression == null) { + cachedRegression = getBackPressureRegressionModel(connection); + if(cachedRegression != null) + cache.put(connection.getIdentifier(), cachedRegression); } ConnectionStatusAnalytics cachedResult = calculate(cachedRegression,connection); - LOG.info("Connection: " + connectionId + " Cached backpressure Time: " + cachedResult.getMinTimeToBackpressureMillis() ); + LOG.info("Connection: " + connectionId + " Cached backpressure Time: " + cachedResult.getTimeToCountBackpressureMillis()); return cachedResult; } + @Override + public long getTimeToBytesBackpressureMillis(String connectionId) { + return 0; + } + + @Override + public long getTimeToCountBackpressureMillis(String connectionId) { + return getConnectionStatusAnalytics(connectionId).getTimeToCountBackpressureMillis(); + } + + @Override + public long getNextIntervalBytes(String connectionId) { + return 0; + } + + @Override + public int getNextIntervalCount(String connectionId) { + return 0; + } + protected ConnectionStatusAnalytics calculate(SimpleRegression regression, Connection conn){ long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold(); @@ -77,14 +101,30 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics { return conn.getName(); } + + @Override + public String getId() { + return conn.getIdentifier(); + } + @Override - public long getMinTimeToBackpressureMillis() { + public long getTimeToBytesBackpressureMillis() { + return 0; + } + + @Override + public long getTimeToCountBackpressureMillis() { return connTimeToBackpressure; } @Override - public String getId() { - return conn.getIdentifier(); + public long getNextIntervalBytes() { + return 0; + } + + @Override + public int getNextIntervalCount() { + return 0; } @Override @@ -111,7 +151,7 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics { * @return */ protected SimpleRegression getBackPressureRegressionModel(Connection conn) { - Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000)); + Date minDate = new Date(System.currentTimeMillis() - (60 * 1000)); StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO( statusRepository.getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE)); List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots(); @@ -128,7 +168,6 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics { Long snapQueuedCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()); long snapTime = snap.getTimestamp().getTime(); regression.addData(snapTime, snapQueuedCount); - LOG.info("Connection " + conn.getIdentifier() + " statistics: ("+snapTime+","+snapQueuedCount+")"); } if (regression.getSlope() <= 0 && !conn.getFlowFileQueue().isFull()) { @@ -141,20 +180,5 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics { } - public void refreshModel() { - ProcessGroup rootGroup = controller.getFlowManager().getRootGroup(); - List<Connection> allConnections = rootGroup.findAllConnections(); - cache.invalidateAll(); - for (Connection conn : allConnections) { - SimpleRegression regression = getBackPressureRegressionModel(conn); - if(regression != null) { - cache.put(conn.getIdentifier(), regression); - } - } - } - @Override - public long getMinTimeToBackpressureMillis() { - return 0; - } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java index 024c138..5a873d5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java @@ -172,22 +172,22 @@ public class StatusAnalyticEngine implements StatusAnalytics { // TODO - populate the prediction fields. Do we need to pass in connection ID? @Override - public long getTimeToCountBackpressureMillis() { + public long getTimeToCountBackpressureMillis(String connectionId) { return 0; } @Override - public long getTimeToBytesBackpressureMillis() { + public long getTimeToBytesBackpressureMillis(String connectionId) { return 0; } @Override - public long getNextIntervalBytes() { + public long getNextIntervalBytes(String connectionId) { return 0; } @Override - public int getNextIntervalCount() { + public int getNextIntervalCount(String connectionId) { return 0; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java index f943856..87fcd4d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java @@ -65,10 +65,12 @@ import org.apache.nifi.remote.RemoteGroupPort; public class StandardEventAccess implements UserAwareEventAccess { private final FlowFileEventRepository flowFileEventRepository; private final FlowController flowController; + private final StatusAnalytics statusAnalytics; public StandardEventAccess(final FlowController flowController, final FlowFileEventRepository flowFileEventRepository) { this.flowController = flowController; this.flowFileEventRepository = flowFileEventRepository; + this.statusAnalytics = flowController.getStatusAnalytics(); } /** @@ -339,12 +341,11 @@ public class StandardEventAccess implements UserAwareEventAccess { bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut(); } - final StatusAnalytics statusAnalytics = flowController.getStatusAnalytics(); if (statusAnalytics != null) { - connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis()); - connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis()); - connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes()); - connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount()); + connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis(conn.getIdentifier())); + connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis(conn.getIdentifier())); + connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes(conn.getIdentifier())); + connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount(conn.getIdentifier())); } if (isConnectionAuthorized) {
