This is an automated email from the ASF dual-hosted git repository. aichrist pushed a commit to branch analytics-framework in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 4fbdb487334381c36af84a14881bf837409e0f99 Author: Andrew I. Christianson <[email protected]> AuthorDate: Tue Jul 9 14:15:30 2019 -0400 NIFI-6510 Implement initial analytic engine --- .../org/apache/nifi/controller/FlowController.java | 14 +++++ .../status/analytics/StatusAnalyticEngine.java | 60 ++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 331e73e..4c0288f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -117,6 +117,7 @@ import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.controller.state.manager.StandardStateManagerProvider; import org.apache.nifi.controller.state.server.ZooKeeperStateServer; +import org.apache.nifi.controller.status.analytics.StatusAnalyticEngine; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.controller.status.history.GarbageCollectionHistory; import org.apache.nifi.controller.status.history.GarbageCollectionStatus; @@ -601,6 +602,19 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node } }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS); + StatusAnalyticEngine analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository); + + timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + analyticsEngine.getMinTimeToBackpressure(); + } catch (final Exception e) { + LOG.error("Failed to capture component stats for Stats History", e); + } + } + }, 1000, 1000, TimeUnit.MILLISECONDS); //FIXME use a real/configured interval + this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED); heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java new file mode 100644 index 0000000..8b69ebf --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.analytics; + +import java.util.List; +import java.util.Map.Entry; + +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.status.history.ComponentStatusRepository; +import org.apache.nifi.controller.status.history.StatusHistoryUtil; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; +import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StatusAnalyticEngine { + private ComponentStatusRepository statusRepository; + private FlowController controller; + + private static final Logger LOG = LoggerFactory.getLogger(StatusAnalyticEngine.class); + + public StatusAnalyticEngine(FlowController controller, ComponentStatusRepository statusRepository) { + this.controller = controller; + this.statusRepository = statusRepository; + } + + public long getMinTimeToBackpressure() { + ProcessGroup rootGroup = controller.getFlowManager().getRootGroup(); + List<Connection> allConnections = rootGroup.findAllConnections(); + + for (Connection conn : allConnections) { + LOG.info("Getting connection history for: " + conn.getIdentifier()); + StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO( + statusRepository.getConnectionStatusHistory(conn.getIdentifier(), null, null, Integer.MAX_VALUE)); + for (StatusSnapshotDTO snap : connHistory.getAggregateSnapshots()) { + for (Entry<String, Long> snapEntry : snap.getStatusMetrics().entrySet()) { + LOG.info("Snap " + snapEntry.getKey() + ": " + snapEntry.getValue()); + } + } + } + + return 0; + } +}
