This is an automated email from the ASF dual-hosted git repository. aichrist pushed a commit to branch analytics-framework in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 7a8d13b2d46c923f6ca59624f82eaa567485d1cb Author: Matthew Burgess <[email protected]> AuthorDate: Tue Jul 23 16:47:29 2019 -0400 NIFI-6510 Split StatusAnalytics interface into Engine and per-Connection versions --- .../analytics/ConnectionStatusAnalytics.java | 28 ++-------- .../status/analytics/StatusAnalytics.java | 22 ++------ .../status/analytics/StatusAnalyticsEngine.java | 22 ++++++++ .../org/apache/nifi/controller/FlowController.java | 10 ++-- ...=> CachingConnectionStatusAnalyticsEngine.java} | 60 ++++++++++------------ ...e.java => ConnectionStatusAnalyticsEngine.java} | 47 ++--------------- .../apache/nifi/reporting/StandardEventAccess.java | 18 ++++--- .../nifi/web/controller/ControllerFacade.java | 4 +- 8 files changed, 81 insertions(+), 130 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java index 2380d55..9792ae4 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java @@ -16,31 +16,11 @@ */ package org.apache.nifi.controller.status.analytics; -public interface ConnectionStatusAnalytics { - /** - * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue. - * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. - */ - long getTimeToBytesBackpressureMillis(); - - /** - * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue. - * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue. - */ - long getTimeToCountBackpressureMillis(); - - /** - * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.). - * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. - */ - long getNextIntervalBytes(); - - /** - * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.). - * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue. - */ - int getNextIntervalCount(); +/** + * The ConnectionStatusAnalytics interface offers additional methods to the StatusAnalytics interface related to the supporting connection information (group ID, e.g.) + */ +public interface ConnectionStatusAnalytics extends StatusAnalytics{ String getGroupId(); String getId(); diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java index 45e1c12..564f1c9 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java @@ -17,43 +17,31 @@ package org.apache.nifi.controller.status.analytics; /** - * StatusAnalytics + * The StatusAnalytics interface offers methods for accessing predicted and other values for a single component (Connection instance, e.g.) */ public interface StatusAnalytics { /** - * Returns a ConnectionStatusAnalytics object containing all relevant metrics and analytical & statistical objects, as well as identity information for the connection. - * - * @param connectionId The unique ID of the connection - * @return A ConnectionStatusAnalytics object - */ - ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId); - - /** * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue. * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. - * @param connectionId */ - long getTimeToBytesBackpressureMillis(String connectionId); + long getTimeToBytesBackpressureMillis(); /** * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue. * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue. - * @param connectionId */ - long getTimeToCountBackpressureMillis(String connectionId); + long getTimeToCountBackpressureMillis(); /** * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.). * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. - * @param connectionId */ - long getNextIntervalBytes(String connectionId); + long getNextIntervalBytes(); /** * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.). * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue. - * @param connectionId */ - int getNextIntervalCount(String connectionId); + int getNextIntervalCount(); } diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsEngine.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsEngine.java new file mode 100644 index 0000000..5cbc333 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsEngine.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.analytics; + +public interface StatusAnalyticsEngine { + + ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId); +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 450d944..b5de777 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -150,8 +150,8 @@ import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.controller.state.manager.StandardStateManagerProvider; import org.apache.nifi.controller.state.server.ZooKeeperStateServer; -import org.apache.nifi.controller.status.analytics.CachingStatusAnalyticEngine; -import org.apache.nifi.controller.status.analytics.StatusAnalytics; +import org.apache.nifi.controller.status.analytics.CachingConnectionStatusAnalyticsEngine; +import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.controller.status.history.GarbageCollectionHistory; import org.apache.nifi.controller.status.history.GarbageCollectionStatus; @@ -351,7 +351,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node // guarded by rwLock private NodeConnectionStatus connectionStatus; - private StatusAnalytics analyticsEngine; + private StatusAnalyticsEngine analyticsEngine; // guarded by rwLock private String instanceId; @@ -594,7 +594,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node } componentStatusRepository = createComponentStatusRepository(); - analyticsEngine = new CachingStatusAnalyticEngine(this, componentStatusRepository); + analyticsEngine = new CachingConnectionStatusAnalyticsEngine(this, componentStatusRepository); eventAccess = new StandardEventAccess(this, flowFileEventRepository); timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { @@ -1379,7 +1379,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node return eventAccess; } - public StatusAnalytics getStatusAnalytics() { + public StatusAnalyticsEngine getStatusAnalyticsEngine() { return analyticsEngine; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java similarity index 79% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java index 864a5d4..c12dbae 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.controller.status.analytics; import java.util.Date; @@ -19,17 +35,17 @@ import org.slf4j.LoggerFactory; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; -public class CachingStatusAnalyticEngine implements StatusAnalytics { +public class CachingConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine { private ComponentStatusRepository statusRepository; private FlowController controller; private volatile Cache<String, SimpleRegression> cache; - private static final Logger LOG = LoggerFactory.getLogger(StatusAnalyticEngine.class); + private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalyticsEngine.class); - public CachingStatusAnalyticEngine(FlowController controller, ComponentStatusRepository statusRepository) { + public CachingConnectionStatusAnalyticsEngine(FlowController controller, ComponentStatusRepository statusRepository) { this.controller = controller; this.statusRepository = statusRepository; this.cache = Caffeine.newBuilder() - .expireAfterWrite(1,TimeUnit.MINUTES) + .expireAfterWrite(1, TimeUnit.MINUTES) .build(); } @@ -40,45 +56,25 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics { Connection connection = rootGroup.findConnection(connectionId); SimpleRegression cachedRegression = cache.getIfPresent(connection.getIdentifier()); - if(cachedRegression == null) { + if (cachedRegression == null) { cachedRegression = getBackPressureRegressionModel(connection); - if(cachedRegression != null) + if (cachedRegression != null) cache.put(connection.getIdentifier(), cachedRegression); } - ConnectionStatusAnalytics cachedResult = calculate(cachedRegression,connection); + ConnectionStatusAnalytics cachedResult = calculate(cachedRegression, connection); LOG.info("Connection: " + connectionId + " Cached backpressure Time: " + cachedResult.getTimeToCountBackpressureMillis()); return cachedResult; } - @Override - public long getTimeToBytesBackpressureMillis(String connectionId) { - return 0; - } - - @Override - public long getTimeToCountBackpressureMillis(String connectionId) { - return getConnectionStatusAnalytics(connectionId).getTimeToCountBackpressureMillis(); - } - - @Override - public long getNextIntervalBytes(String connectionId) { - return 0; - } - - @Override - public int getNextIntervalCount(String connectionId) { - return 0; - } - - protected ConnectionStatusAnalytics calculate(SimpleRegression regression, Connection conn){ + protected ConnectionStatusAnalytics calculate(SimpleRegression regression, Connection conn) { long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold(); final long connTimeToBackpressure; - if(regression == null){ + if (regression == null) { connTimeToBackpressure = Long.MAX_VALUE; - }else{ + } else { //If calculation returns as negative only 0 will return connTimeToBackpressure = Math.max(0, Math.round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope()) - System.currentTimeMillis()); @@ -147,6 +143,7 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics { /** * Get backpressure model based on current data + * * @param conn the connection to run the analytic on * @return */ @@ -177,8 +174,5 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics { return regression; } } - } - - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java similarity index 77% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java index 5a873d5..11862c8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java @@ -31,13 +31,13 @@ import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StatusAnalyticEngine implements StatusAnalytics { +public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine { private ComponentStatusRepository statusRepository; private FlowController controller; - private static final Logger LOG = LoggerFactory.getLogger(StatusAnalyticEngine.class); + private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalyticsEngine.class); - public StatusAnalyticEngine(FlowController controller, ComponentStatusRepository statusRepository) { + public ConnectionStatusAnalyticsEngine(FlowController controller, ComponentStatusRepository statusRepository) { this.controller = controller; this.statusRepository = statusRepository; } @@ -54,7 +54,7 @@ public class StatusAnalyticEngine implements StatusAnalytics { * @return */ public ConnectionStatusAnalytics getConnectionStatusAnalytics(Connection conn) { - LOG.info("Getting connection history for: " + conn.getIdentifier()); + LOG.debug("Getting connection history for: " + conn.getIdentifier()); long connTimeToBackpressure; Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000)); StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO( @@ -67,8 +67,7 @@ public class StatusAnalyticEngine implements StatusAnalytics { } else { long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold(); - LOG.info("Connection " + conn.getIdentifier() + " backpressure object threshold is " - + Long.toString(backPressureObjectThreshold)); + LOG.info("Connection " + conn.getIdentifier() + " backpressure object threshold is " + backPressureObjectThreshold); ConnectionStatusDescriptor.QUEUED_COUNT.getField(); @@ -154,40 +153,4 @@ public class StatusAnalyticEngine implements StatusAnalytics { } }; } - - public long getMinTimeToBackpressureMillis() { - ProcessGroup rootGroup = controller.getFlowManager().getRootGroup(); - List<Connection> allConnections = rootGroup.findAllConnections(); - rootGroup.findConnection("asdf"); - long minTimeToBackpressure = Long.MAX_VALUE; - - for (Connection conn : allConnections) { - ConnectionStatusAnalytics connAnalytics = getConnectionStatusAnalytics(conn); - minTimeToBackpressure = Math.min(minTimeToBackpressure, connAnalytics.getTimeToCountBackpressureMillis()); - } - - LOG.info("Min time to backpressure is: " + Long.toString(minTimeToBackpressure)); - return minTimeToBackpressure; - } - - // TODO - populate the prediction fields. Do we need to pass in connection ID? - @Override - public long getTimeToCountBackpressureMillis(String connectionId) { - return 0; - } - - @Override - public long getTimeToBytesBackpressureMillis(String connectionId) { - return 0; - } - - @Override - public long getNextIntervalBytes(String connectionId) { - return 0; - } - - @Override - public int getNextIntervalCount(String connectionId) { - return 0; - } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java index 87fcd4d..33e650b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java @@ -52,6 +52,7 @@ import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.controller.status.RunStatus; import org.apache.nifi.controller.status.TransmissionStatus; import org.apache.nifi.controller.status.analytics.StatusAnalytics; +import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.history.History; @@ -65,12 +66,12 @@ import org.apache.nifi.remote.RemoteGroupPort; public class StandardEventAccess implements UserAwareEventAccess { private final FlowFileEventRepository flowFileEventRepository; private final FlowController flowController; - private final StatusAnalytics statusAnalytics; + private final StatusAnalyticsEngine statusAnalyticsEngine; public StandardEventAccess(final FlowController flowController, final FlowFileEventRepository flowFileEventRepository) { this.flowController = flowController; this.flowFileEventRepository = flowFileEventRepository; - this.statusAnalytics = flowController.getStatusAnalytics(); + this.statusAnalyticsEngine = flowController.getStatusAnalyticsEngine(); } /** @@ -341,11 +342,14 @@ public class StandardEventAccess implements UserAwareEventAccess { bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut(); } - if (statusAnalytics != null) { - connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis(conn.getIdentifier())); - connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis(conn.getIdentifier())); - connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes(conn.getIdentifier())); - connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount(conn.getIdentifier())); + if (statusAnalyticsEngine != null) { + StatusAnalytics statusAnalytics = statusAnalyticsEngine.getConnectionStatusAnalytics(conn.getIdentifier()); + if (statusAnalytics != null) { + connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis()); + connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis()); + connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes()); + connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount()); + } } if (isConnectionAuthorized) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 367ea51..2fcef27 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -57,7 +57,7 @@ import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics; -import org.apache.nifi.controller.status.analytics.StatusAnalytics; +import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.diagnostics.SystemDiagnostics; import org.apache.nifi.flowfile.FlowFilePrioritizer; @@ -705,7 +705,7 @@ public class ControllerFacade implements Authorizable { } // get from flow controller - final StatusAnalytics status = flowController.getStatusAnalytics(); + final StatusAnalyticsEngine status = flowController.getStatusAnalyticsEngine(); if (status == null) { throw new ResourceNotFoundException(String.format("Unable to locate connection with id '%s'.", connectionId)); }
