This is an automated email from the ASF dual-hosted git repository. aichrist pushed a commit to branch analytics-framework in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 5ef715e6c3e064f814972030608a50b4a6d3209c Author: Yolanda Davis <[email protected]> AuthorDate: Fri Jul 19 17:28:29 2019 -0400 NIFI-6510 Added poc engine with prediction model caching (cherry picked from commit e013b91) DFA-9 - updated logging and corrected logic for checking if not in backpressure (cherry picked from commit a1f8e70) --- .../nifi-framework/nifi-framework-core/pom.xml | 6 + .../org/apache/nifi/controller/FlowController.java | 77 +++++----- .../analytics/CachingStatusAnalyticEngine.java | 160 +++++++++++++++++++++ .../apache/nifi/reporting/StandardEventAccess.java | 14 +- 4 files changed, 212 insertions(+), 45 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index 6551d54..2d17086 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -241,6 +241,12 @@ <version>1.10.0-SNAPSHOT</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>1.0.1</version> + <scope>compile</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 56272ff..462b113 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -16,6 +16,39 @@ */ package org.apache.nifi.controller; +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import javax.net.ssl.SSLContext; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; @@ -117,7 +150,7 @@ import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.controller.state.manager.StandardStateManagerProvider; import org.apache.nifi.controller.state.server.ZooKeeperStateServer; -import org.apache.nifi.controller.status.analytics.StatusAnalyticEngine; +import org.apache.nifi.controller.status.analytics.CachingStatusAnalyticEngine; import org.apache.nifi.controller.status.analytics.StatusAnalytics; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.controller.status.history.GarbageCollectionHistory; @@ -184,38 +217,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.management.GarbageCollectorMXBean; -import java.lang.management.ManagementFactory; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -import static java.util.Objects.requireNonNull; - public class FlowController implements ReportingTaskProvider, Authorizable, NodeTypeProvider { // default repository implementations @@ -350,7 +351,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node // guarded by rwLock private NodeConnectionStatus connectionStatus; - private StatusAnalyticEngine analyticsEngine; + private CachingStatusAnalyticEngine analyticsEngine; // guarded by rwLock private String instanceId; @@ -605,18 +606,18 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node } }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS); - analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository); + analyticsEngine = new CachingStatusAnalyticEngine(this, componentStatusRepository); timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { - analyticsEngine.getMinTimeToBackpressureMillis(); + analyticsEngine.refreshModel(); } catch (final Exception e) { - LOG.error("Failed to capture component stats for Stats History", e); + LOG.error("Failed to refresh model:", e); } } - }, 1000, 1000, TimeUnit.MILLISECONDS); //FIXME use a real/configured interval (or maybe just compute on the fly when requested) + }, 1, 1, TimeUnit.MINUTES); //FIXME use a real/configured interval (or maybe just compute on the fly when requested) this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED); heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java new file mode 100644 index 0000000..5241c4a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java @@ -0,0 +1,160 @@ +package org.apache.nifi.controller.status.analytics; + +import java.util.Date; +import java.util.List; + +import org.apache.commons.math3.stat.regression.SimpleRegression; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.status.history.ComponentStatusRepository; +import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; +import org.apache.nifi.controller.status.history.StatusHistoryUtil; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; +import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + +public class CachingStatusAnalyticEngine implements StatusAnalytics { + private ComponentStatusRepository statusRepository; + private FlowController controller; + private volatile Cache<String, SimpleRegression> cache; + private static final Logger LOG = LoggerFactory.getLogger(StatusAnalyticEngine.class); + + public CachingStatusAnalyticEngine(FlowController controller, ComponentStatusRepository statusRepository) { + this.controller = controller; + this.statusRepository = statusRepository; + this.cache = Caffeine.newBuilder() + .build(); + } + + @Override + public ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId) { + + ProcessGroup rootGroup = controller.getFlowManager().getRootGroup(); + Connection connection = rootGroup.findConnection(connectionId); + SimpleRegression cachedRegression = cache.getIfPresent(connection.getIdentifier()); + + if(cachedRegression != null) { + cache.put(connection.getIdentifier(), cachedRegression); + } + + ConnectionStatusAnalytics cachedResult = calculate(cachedRegression,connection); + LOG.info("Connection: " + connectionId + " Cached backpressure Time: " + cachedResult.getMinTimeToBackpressureMillis() ); + return cachedResult; + } + + protected ConnectionStatusAnalytics calculate(SimpleRegression regression, Connection conn){ + long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold(); + + final long connTimeToBackpressure; + + if(regression == null){ + connTimeToBackpressure = Long.MAX_VALUE; + }else{ + //If calculation returns as negative only 0 will return + connTimeToBackpressure = Math.max(0, Math.round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope()) + - System.currentTimeMillis()); + } + + return new ConnectionStatusAnalytics() { + + @Override + public String getSourceName() { + return conn.getSource().getName(); + } + + @Override + public String getSourceId() { + return conn.getSource().getIdentifier(); + } + + @Override + public String getName() { + return conn.getName(); + } + + @Override + public long getMinTimeToBackpressureMillis() { + return connTimeToBackpressure; + } + + @Override + public String getId() { + return conn.getIdentifier(); + } + + @Override + public String getGroupId() { + return conn.getProcessGroup().getIdentifier(); + } + + @Override + public String getDestinationName() { + return conn.getDestination().getName(); + } + + @Override + public String getDestinationId() { + return conn.getDestination().getIdentifier(); + } + }; + + } + + /** + * Get backpressure model based on current data + * @param conn the connection to run the analytic on + * @return + */ + protected SimpleRegression getBackPressureRegressionModel(Connection conn) { + Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000)); + StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO( + statusRepository.getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE)); + List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots(); + + if (aggregateSnapshots.size() < 2) { + LOG.info("Not enough data to model time to backpressure."); + return null; + } else { + + ConnectionStatusDescriptor.QUEUED_COUNT.getField(); + SimpleRegression regression = new SimpleRegression(); + + for (StatusSnapshotDTO snap : aggregateSnapshots) { + Long snapQueuedCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()); + long snapTime = snap.getTimestamp().getTime(); + regression.addData(snapTime, snapQueuedCount); + LOG.info("Connection " + conn.getIdentifier() + " statistics: ("+snapTime+","+snapQueuedCount+")"); + } + + if (regression.getSlope() <= 0 && !conn.getFlowFileQueue().isFull()) { + LOG.info("Connection " + conn.getIdentifier() + " is not experiencing backpressure."); + return null; + } else { + return regression; + } + } + + } + + public void refreshModel() { + ProcessGroup rootGroup = controller.getFlowManager().getRootGroup(); + List<Connection> allConnections = rootGroup.findAllConnections(); + cache.invalidateAll(); + for (Connection conn : allConnections) { + SimpleRegression regression = getBackPressureRegressionModel(conn); + if(regression != null) { + cache.put(conn.getIdentifier(), regression); + } + } + } + + @Override + public long getMinTimeToBackpressureMillis() { + return 0; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java index 095ddf8..0b4b73c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java @@ -16,6 +16,13 @@ */ package org.apache.nifi.reporting; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + import org.apache.commons.collections4.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -54,13 +61,6 @@ import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.remote.PublicPort; import org.apache.nifi.remote.RemoteGroupPort; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - public class StandardEventAccess implements UserAwareEventAccess { private final FlowFileEventRepository flowFileEventRepository; private final FlowController flowController;
