This is an automated email from the ASF dual-hosted git repository. aichrist pushed a commit to branch analytics-framework in repository https://gitbox.apache.org/repos/asf/nifi.git
commit e1f5296d049fcab1b94e1f6cc08f0fe57322f9f1 Author: Matthew Burgess <[email protected]> AuthorDate: Mon Jul 15 11:56:55 2019 -0400 NIFI-6510 Initial analytics REST endpoint and supporting objects --- .../status/analytics/StatusAnalytics.java | 21 +++ .../api/dto/status/ConnectionStatisticsDTO.java | 161 +++++++++++++++++++++ .../status/ConnectionStatisticsSnapshotDTO.java | 149 +++++++++++++++++++ .../NodeConnectionStatisticsSnapshotDTO.java | 78 ++++++++++ .../web/api/entity/ConnectionStatisticsEntity.java | 55 +++++++ .../entity/ConnectionStatisticsSnapshotEntity.java | 76 ++++++++++ .../org/apache/nifi/controller/FlowController.java | 3 +- .../status/analytics/StatusAnalyticEngine.java | 3 +- .../org/apache/nifi/web/NiFiServiceFacade.java | 9 ++ .../apache/nifi/web/StandardNiFiServiceFacade.java | 10 ++ .../java/org/apache/nifi/web/api/FlowResource.java | 74 ++++++++++ .../org/apache/nifi/web/api/dto/DtoFactory.java | 28 ++++ .../org/apache/nifi/web/api/dto/EntityFactory.java | 19 +++ .../nifi/web/controller/ControllerFacade.java | 32 ++++ 14 files changed, 716 insertions(+), 2 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java new file mode 100644 index 0000000..d6ad3bc --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.analytics; + +public interface StatusAnalytics { + long getMinTimeToBackpressureMillis(); +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsDTO.java new file mode 100644 index 0000000..79ae947 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsDTO.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto.status; + +import io.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.util.TimeAdapter; + +import javax.xml.bind.annotation.XmlType; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +@XmlType(name = "connectionStatistics") +public class ConnectionStatisticsDTO implements Cloneable { + private String id; + private String groupId; + private String name; + private Date statsLastRefreshed; + + private String sourceId; + private String sourceName; + private String destinationId; + + private String destinationName; + private ConnectionStatisticsSnapshotDTO aggregateSnapshot; + + private List<NodeConnectionStatisticsSnapshotDTO> nodeSnapshots; + + @ApiModelProperty("The ID of the connection") + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + @ApiModelProperty("The ID of the Process Group that the connection belongs to") + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + @ApiModelProperty("The name of the connection") + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @ApiModelProperty("The ID of the source component") + public String getSourceId() { + return sourceId; + } + + public void setSourceId(String sourceId) { + this.sourceId = sourceId; + } + + @ApiModelProperty("The name of the source component") + public String getSourceName() { + return sourceName; + } + + public void setSourceName(String sourceName) { + this.sourceName = sourceName; + } + + @ApiModelProperty("The ID of the destination component") + public String getDestinationId() { + return destinationId; + } + + public void setDestinationId(String destinationId) { + this.destinationId = destinationId; + } + + @ApiModelProperty("The name of the destination component") + public String getDestinationName() { + return destinationName; + } + + public void setDestinationName(String destinationName) { + this.destinationName = destinationName; + } + + @ApiModelProperty("The status snapshot that represents the aggregate stats of the cluster") + public ConnectionStatisticsSnapshotDTO getAggregateSnapshot() { + return aggregateSnapshot; + } + + public void setAggregateSnapshot(ConnectionStatisticsSnapshotDTO aggregateSnapshot) { + this.aggregateSnapshot = aggregateSnapshot; + } + + @ApiModelProperty("A list of status snapshots for each node") + public List<NodeConnectionStatisticsSnapshotDTO> getNodeSnapshots() { + return nodeSnapshots; + } + + public void setNodeSnapshots(List<NodeConnectionStatisticsSnapshotDTO> nodeSnapshots) { + this.nodeSnapshots = nodeSnapshots; + } + + @XmlJavaTypeAdapter(TimeAdapter.class) + @ApiModelProperty( + value = "The timestamp of when the stats were last refreshed", + dataType = "string" + ) + public Date getStatsLastRefreshed() { + return statsLastRefreshed; + } + + public void setStatsLastRefreshed(Date statsLastRefreshed) { + this.statsLastRefreshed = statsLastRefreshed; + } + + @Override + public ConnectionStatisticsDTO clone() { + final ConnectionStatisticsDTO other = new ConnectionStatisticsDTO(); + other.setDestinationId(getDestinationId()); + other.setDestinationName(getDestinationName()); + other.setGroupId(getGroupId()); + other.setId(getId()); + other.setName(getName()); + other.setSourceId(getSourceId()); + other.setSourceName(getSourceName()); + other.setAggregateSnapshot(getAggregateSnapshot().clone()); + + + final List<NodeConnectionStatisticsSnapshotDTO> nodeStatuses = getNodeSnapshots(); + final List<NodeConnectionStatisticsSnapshotDTO> nodeStatusClones = new ArrayList<>(nodeStatuses.size()); + for (final NodeConnectionStatisticsSnapshotDTO nodeStatus : nodeStatuses) { + nodeStatusClones.add(nodeStatus.clone()); + } + other.setNodeSnapshots(nodeStatusClones); + + return other; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java new file mode 100644 index 0000000..e914f74 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto.status; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; + +/** + * DTO for serializing the statistics of a connection. + */ +@XmlType(name = "connectionStatisticsSnapshot") +public class ConnectionStatisticsSnapshotDTO implements Cloneable { + + private String id; + private String groupId; + private String name; + + private String sourceId; + private String sourceName; + private String destinationId; + private String destinationName; + + private Long predictedMillisUntilBackpressure = 0L; + + /* getters / setters */ + /** + * @return The connection id + */ + @ApiModelProperty("The id of the connection.") + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + /** + * @return the ID of the Process Group to which this connection belongs. + */ + @ApiModelProperty("The id of the process group the connection belongs to.") + public String getGroupId() { + return groupId; + } + + public void setGroupId(final String groupId) { + this.groupId = groupId; + } + + /** + * @return name of this connection + */ + @ApiModelProperty("The name of the connection.") + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + /** + * @return id of the source of this connection + */ + @ApiModelProperty("The id of the source of the connection.") + public String getSourceId() { + return sourceId; + } + + public void setSourceId(String sourceId) { + this.sourceId = sourceId; + } + + /** + * @return name of the source of this connection + */ + @ApiModelProperty("The name of the source of the connection.") + public String getSourceName() { + return sourceName; + } + + public void setSourceName(String sourceName) { + this.sourceName = sourceName; + } + + /** + * @return id of the destination of this connection + */ + @ApiModelProperty("The id of the destination of the connection.") + public String getDestinationId() { + return destinationId; + } + + public void setDestinationId(String destinationId) { + this.destinationId = destinationId; + } + + /** + * @return name of the destination of this connection + */ + @ApiModelProperty("The name of the destination of the connection.") + public String getDestinationName() { + return destinationName; + } + + public void setDestinationName(String destinationName) { + this.destinationName = destinationName; + } + + @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied.") + public Long getPredictedMillisUntilBackpressure() { + return predictedMillisUntilBackpressure; + } + + public void setPredictedMillisUntilBackpressure(Long predictedMillisUntilBackpressure) { + this.predictedMillisUntilBackpressure = predictedMillisUntilBackpressure; + } + + @Override + public ConnectionStatisticsSnapshotDTO clone() { + final ConnectionStatisticsSnapshotDTO other = new ConnectionStatisticsSnapshotDTO(); + other.setDestinationId(getDestinationId()); + other.setDestinationName(getDestinationName()); + other.setGroupId(getGroupId()); + other.setId(getId()); + other.setName(getName()); + other.setSourceId(getSourceId()); + other.setSourceName(getSourceName()); + + other.setPredictedMillisUntilBackpressure(getPredictedMillisUntilBackpressure()); + + return other; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeConnectionStatisticsSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeConnectionStatisticsSnapshotDTO.java new file mode 100644 index 0000000..76f94ec --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeConnectionStatisticsSnapshotDTO.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto.status; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; + +@XmlType(name = "nodeConnectionStatisticsSnapshot") +public class NodeConnectionStatisticsSnapshotDTO implements Cloneable { + private String nodeId; + private String address; + private Integer apiPort; + + private ConnectionStatisticsSnapshotDTO statisticsSnapshot; + + @ApiModelProperty("The unique ID that identifies the node") + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + @ApiModelProperty("The API address of the node") + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + @ApiModelProperty("The API port used to communicate with the node") + public Integer getApiPort() { + return apiPort; + } + + public void setApiPort(Integer apiPort) { + this.apiPort = apiPort; + } + + @ApiModelProperty("The connection status snapshot from the node.") + public ConnectionStatisticsSnapshotDTO getStatisticsSnapshot() { + return statisticsSnapshot; + } + + public void setStatisticsSnapshot(ConnectionStatisticsSnapshotDTO statisticsSnapshot) { + this.statisticsSnapshot = statisticsSnapshot; + } + + @Override + public NodeConnectionStatisticsSnapshotDTO clone() { + final NodeConnectionStatisticsSnapshotDTO other = new NodeConnectionStatisticsSnapshotDTO(); + other.setNodeId(getNodeId()); + other.setAddress(getAddress()); + other.setApiPort(getApiPort()); + other.setStatisticsSnapshot(getStatisticsSnapshot().clone()); + return other; + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsEntity.java new file mode 100644 index 0000000..781cff6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsEntity.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import org.apache.nifi.web.api.dto.ReadablePermission; +import org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ConnectionStatisticsDTO. + */ +@XmlRootElement(name = "connectionStatisticsEntity") +public class ConnectionStatisticsEntity extends Entity implements ReadablePermission { + + private ConnectionStatisticsDTO connectionStatistics; + private Boolean canRead; + + /** + * The ConnectionStatisticsDTO that is being serialized. + * + * @return The ConnectionStatisticsDTO object + */ + public ConnectionStatisticsDTO getConnectionStatistics() { + return connectionStatistics; + } + + public void setConnectionStatistics(ConnectionStatisticsDTO connectionStatistics) { + this.connectionStatistics = connectionStatistics; + } + + @Override + public Boolean getCanRead() { + return canRead; + } + + @Override + public void setCanRead(Boolean canRead) { + this.canRead = canRead; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java new file mode 100644 index 0000000..da7e5ca --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import io.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.ReadablePermission; +import org.apache.nifi.web.api.dto.status.ConnectionStatisticsSnapshotDTO; +import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO; + +/** + * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. + * This particular entity holds a reference to a ConnectionStatisticsSnapshotDTO. + */ +public class ConnectionStatisticsSnapshotEntity extends Entity implements ReadablePermission, Cloneable { + private String id; + private ConnectionStatisticsSnapshotDTO connectionStatisticsSnapshot; + private Boolean canRead; + + /** + * @return The connection id + */ + @ApiModelProperty("The id of the connection.") + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + /** + * The ConnectionStatisticsSnapshotDTO that is being serialized. + * + * @return The ConnectionStatisticsSnapshotDTO object + */ + public ConnectionStatisticsSnapshotDTO getConnectionStatisticsSnapshot() { + return connectionStatisticsSnapshot; + } + + public void setConnectionStatisticsSnapshot(ConnectionStatisticsSnapshotDTO connectionStatusSnapshot) { + this.connectionStatisticsSnapshot = connectionStatusSnapshot; + } + + @Override + public Boolean getCanRead() { + return canRead; + } + + @Override + public void setCanRead(Boolean canRead) { + this.canRead = canRead; + } + + @Override + public ConnectionStatisticsSnapshotEntity clone() { + final ConnectionStatisticsSnapshotEntity other = new ConnectionStatisticsSnapshotEntity(); + other.setCanRead(this.getCanRead()); + other.setConnectionStatisticsSnapshot(this.getConnectionStatisticsSnapshot().clone()); + + return other; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index f7ed734..0c422b4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -118,6 +118,7 @@ import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.controller.state.manager.StandardStateManagerProvider; import org.apache.nifi.controller.state.server.ZooKeeperStateServer; import org.apache.nifi.controller.status.analytics.StatusAnalyticEngine; +import org.apache.nifi.controller.status.analytics.StatusAnalytics; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.controller.status.history.GarbageCollectionHistory; import org.apache.nifi.controller.status.history.GarbageCollectionStatus; @@ -602,7 +603,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node } }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS); - StatusAnalyticEngine analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository); + StatusAnalytics analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository); timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java index 0602a93..9231707 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java @@ -31,7 +31,7 @@ import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StatusAnalyticEngine { +public class StatusAnalyticEngine implements StatusAnalytics { private ComponentStatusRepository statusRepository; private FlowController controller; @@ -42,6 +42,7 @@ public class StatusAnalyticEngine { this.statusRepository = statusRepository; } + @Override public long getMinTimeToBackpressureMillis() { ProcessGroup rootGroup = controller.getFlowManager().getRootGroup(); List<Connection> allConnections = rootGroup.findAllConnections(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index f62bec2..9ed8808 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -87,6 +87,7 @@ import org.apache.nifi.web.api.entity.BucketEntity; import org.apache.nifi.web.api.entity.BulletinEntity; import org.apache.nifi.web.api.entity.ComponentValidationResultEntity; import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ConnectionStatisticsEntity; import org.apache.nifi.web.api.entity.ConnectionStatusEntity; import org.apache.nifi.web.api.entity.ControllerBulletinsEntity; import org.apache.nifi.web.api.entity.ControllerConfigurationEntity; @@ -662,6 +663,14 @@ public interface NiFiServiceFacade { StatusHistoryEntity getConnectionStatusHistory(String connectionId); /** + * Gets analytical statistics for the specified connection. + * + * @param connectionId connection + * @return statistics + */ + ConnectionStatisticsEntity getConnectionStatistics(String connectionId); + + /** * Creates a new Relationship target. * * @param revision revision diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 5ba1da6..f2e7608 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -216,6 +216,7 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO; import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO; import org.apache.nifi.web.api.dto.search.SearchResultsDTO; +import org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO; @@ -235,6 +236,7 @@ import org.apache.nifi.web.api.entity.BulletinEntity; import org.apache.nifi.web.api.entity.ComponentReferenceEntity; import org.apache.nifi.web.api.entity.ComponentValidationResultEntity; import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ConnectionStatisticsEntity; import org.apache.nifi.web.api.entity.ConnectionStatusEntity; import org.apache.nifi.web.api.entity.ControllerBulletinsEntity; import org.apache.nifi.web.api.entity.ControllerConfigurationEntity; @@ -3191,6 +3193,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return entityFactory.createStatusHistoryEntity(dto, permissions); } + @Override + public ConnectionStatisticsEntity getConnectionStatistics(final String connectionId) { + final Connection connection = connectionDAO.getConnection(connectionId); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection); + final ConnectionStatisticsDTO dto = dtoFactory.createConnectionStatisticsDto(controllerFacade.getConnectionStatistics(connectionId)); + return entityFactory.createConnectionStatisticsEntity(dto, permissions); + } + private ProcessorEntity createProcessorEntity(final ProcessorNode processor, final NiFiUser user) { final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier())); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java index 4b34f39..6f28d44 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java @@ -75,6 +75,7 @@ import org.apache.nifi.web.api.entity.BulletinBoardEntity; import org.apache.nifi.web.api.entity.ClusteSummaryEntity; import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity; import org.apache.nifi.web.api.entity.ComponentHistoryEntity; +import org.apache.nifi.web.api.entity.ConnectionStatisticsEntity; import org.apache.nifi.web.api.entity.ConnectionStatusEntity; import org.apache.nifi.web.api.entity.ControllerBulletinsEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; @@ -2074,6 +2075,79 @@ public class FlowResource extends ApplicationResource { return generateOkResponse(entity).build(); } + /** + * Retrieves the specified connection statistics. + * + * @param id The id of the connection statistics to retrieve. + * @return A ConnectionStatisticsEntity. + * @throws InterruptedException if interrupted + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("connections/{id}/statistics") + @ApiOperation( + value = "Gets statistics for a connection", + response = ConnectionStatisticsEntity.class, + authorizations = { + @Authorization(value = "Read - /flow") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response getConnectionStatistics( + @ApiParam( + value = "Whether or not to include the breakdown per node. Optional, defaults to false", + required = false + ) + @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise, + @ApiParam( + value = "The id of the node where to get the statistics.", + required = false + ) + @QueryParam("clusterNodeId") String clusterNodeId, + @ApiParam( + value = "The connection id.", + required = true + ) + @PathParam("id") String id) throws InterruptedException { + + authorizeFlow(); + + // ensure a valid request + if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) { + throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node."); + } + + if (isReplicateRequest()) { + // determine where this request should be sent + if (clusterNodeId == null) { + final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET); + final ConnectionStatisticsEntity entity = (ConnectionStatisticsEntity) nodeResponse.getUpdatedEntity(); + + // ensure there is an updated entity (result of merging) and prune the response as necessary + if (entity != null && !nodewise) { + entity.getConnectionStatistics().setNodeSnapshots(null); + } + + return nodeResponse.getResponse(); + } else { + return replicate(HttpMethod.GET, clusterNodeId); + } + } + + // get the specified connection status + final ConnectionStatisticsEntity entity = serviceFacade.getConnectionStatistics(id); + return generateOkResponse(entity).build(); + } + // -------------- // status history // -------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 97b8c5e..6903e44 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -105,6 +105,7 @@ import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.analytics.StatusAnalytics; import org.apache.nifi.controller.status.history.GarbageCollectionHistory; import org.apache.nifi.controller.status.history.GarbageCollectionStatus; import org.apache.nifi.diagnostics.GarbageCollection; @@ -198,6 +199,8 @@ import org.apache.nifi.web.api.dto.provenance.lineage.LineageRequestDTO.LineageR import org.apache.nifi.web.api.dto.provenance.lineage.LineageResultsDTO; import org.apache.nifi.web.api.dto.provenance.lineage.ProvenanceLinkDTO; import org.apache.nifi.web.api.dto.provenance.lineage.ProvenanceNodeDTO; +import org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO; +import org.apache.nifi.web.api.dto.status.ConnectionStatisticsSnapshotDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.PortStatusDTO; @@ -1186,6 +1189,31 @@ public final class DtoFactory { return connectionStatusDto; } + public ConnectionStatisticsDTO createConnectionStatisticsDto(final StatusAnalytics connectionStatistics) { + final ConnectionStatisticsDTO connectionStatisticsDTO = new ConnectionStatisticsDTO(); + connectionStatisticsDTO.setGroupId(connectionStatistics.getGroupId()); + connectionStatisticsDTO.setId(connectionStatistics.getId()); + connectionStatisticsDTO.setName(connectionStatistics.getName()); + connectionStatisticsDTO.setSourceId(connectionStatistics.getSourceId()); + connectionStatisticsDTO.setSourceName(connectionStatistics.getSourceName()); + connectionStatisticsDTO.setDestinationId(connectionStatistics.getDestinationId()); + connectionStatisticsDTO.setDestinationName(connectionStatistics.getDestinationName()); + connectionStatisticsDTO.setStatsLastRefreshed(new Date()); + + final ConnectionStatisticsSnapshotDTO snapshot = new ConnectionStatisticsSnapshotDTO(); + connectionStatisticsDTO.setAggregateSnapshot(snapshot); + + snapshot.setId(connectionStatistics.getId()); + snapshot.setGroupId(connectionStatistics.getGroupId()); + snapshot.setName(connectionStatistics.getName()); + snapshot.setSourceName(connectionStatistics.getSourceName()); + snapshot.setDestinationName(connectionStatistics.getDestinationName()); + + snapshot.setPredictedMillisUntilBackpressure(connectionStatistics.getMinTimeToBackpressureMillis()); + + return connectionStatisticsDTO; + } + public ProcessorStatusDTO createProcessorStatusDto(final ProcessorStatus procStatus) { final ProcessorStatusDTO dto = new ProcessorStatusDTO(); dto.setId(procStatus.getId()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java index 915ad2c..1db0adc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java @@ -20,6 +20,8 @@ import org.apache.nifi.web.api.dto.action.ActionDTO; import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO; import org.apache.nifi.web.api.dto.flow.FlowBreadcrumbDTO; import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO; +import org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO; +import org.apache.nifi.web.api.dto.status.ConnectionStatisticsSnapshotDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.ControllerServiceStatusDTO; @@ -43,6 +45,8 @@ import org.apache.nifi.web.api.entity.BulletinEntity; import org.apache.nifi.web.api.entity.ComponentReferenceEntity; import org.apache.nifi.web.api.entity.ComponentValidationResultEntity; import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ConnectionStatisticsEntity; +import org.apache.nifi.web.api.entity.ConnectionStatisticsSnapshotEntity; import org.apache.nifi.web.api.entity.ConnectionStatusEntity; import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity; import org.apache.nifi.web.api.entity.ControllerConfigurationEntity; @@ -137,6 +141,21 @@ public final class EntityFactory { return entity; } + public ConnectionStatisticsEntity createConnectionStatisticsEntity(final ConnectionStatisticsDTO statistics, final PermissionsDTO permissions) { + final ConnectionStatisticsEntity entity = new ConnectionStatisticsEntity(); + entity.setCanRead(permissions.getCanRead()); + entity.setConnectionStatistics(statistics); // always set the statistics, as it's always allowed... just need to provide permission context for merging responses + return entity; + } + + public ConnectionStatisticsSnapshotEntity createConnectionStatisticsSnapshotEntity(final ConnectionStatisticsSnapshotDTO statistics, final PermissionsDTO permissions) { + final ConnectionStatisticsSnapshotEntity entity = new ConnectionStatisticsSnapshotEntity(); + entity.setId(statistics.getId()); + entity.setCanRead(permissions.getCanRead()); + entity.setConnectionStatisticsSnapshot(statistics); // always set the statistics, as it's always allowed... just need to provide permission context for merging responses + return entity; + } + public ProcessGroupStatusEntity createProcessGroupStatusEntity(final ProcessGroupStatusDTO status, final PermissionsDTO permissions) { final ProcessGroupStatusEntity entity = new ProcessGroupStatusEntity(); entity.setCanRead(permissions.getCanRead()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index e560516..c1b6754 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -56,6 +56,7 @@ import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.analytics.StatusAnalytics; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.diagnostics.SystemDiagnostics; import org.apache.nifi.flowfile.FlowFilePrioritizer; @@ -681,6 +682,37 @@ public class ControllerFacade implements Authorizable { } /** + * Gets analytical statistics for the specified connection. + * + * @param connectionId connection id + * @return the statistics for the specified connection + */ + public StatusAnalytics getConnectionStatistics(final String connectionId) { + final ProcessGroup root = getRootGroup(); + final Connection connection = root.findConnection(connectionId); + + // ensure the connection was found + if (connection == null) { + throw new ResourceNotFoundException(String.format("Unable to locate connection with id '%s'.", connectionId)); + } + + // calculate the process group status + final String groupId = connection.getProcessGroup().getIdentifier(); + final ProcessGroupStatus processGroupStatus = flowController.getEventAccess().getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), 1); + if (processGroupStatus == null) { + throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId)); + } + + // TODO get from flow controller + final StatusAnalytics status; + if (status == null) { + throw new ResourceNotFoundException(String.format("Unable to locate connection with id '%s'.", connectionId)); + } + + return status; + } + + /** * Gets the status for the specified input port. * * @param portId input port id
