NIFI-1563: - Implementing component specific endpoints.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/61a7cbea Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/61a7cbea Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/61a7cbea Branch: refs/heads/NIFI-1563 Commit: 61a7cbeaa1e049b24998352689996710433c0629 Parents: a531a3b Author: Matt Gilman <[email protected]> Authored: Fri Mar 18 11:21:40 2016 -0400 Committer: Matt Gilman <[email protected]> Committed: Fri Mar 18 11:21:40 2016 -0400 ---------------------------------------------------------------------- .../controller/status/ProcessGroupStatus.java | 12 - .../web/api/dto/status/ConnectionStatusDTO.java | 26 +- .../nifi/web/api/dto/status/PortStatusDTO.java | 21 ++ .../api/dto/status/ProcessGroupStatusDTO.java | 21 ++ .../status/ProcessGroupStatusSnapshotDTO.java | 20 -- .../web/api/dto/status/ProcessorStatusDTO.java | 46 ++-- .../dto/status/ProcessorStatusSnapshotDTO.java | 13 +- .../dto/status/RemoteProcessGroupStatusDTO.java | 20 ++ .../nifi/web/api/dto/status/StatusMerger.java | 51 +++- .../cluster/manager/impl/WebClusterManager.java | 270 ++++++++++++++++--- .../apache/nifi/controller/FlowController.java | 1 - .../apache/nifi/web/api/ConnectionResource.java | 16 +- .../apache/nifi/web/api/InputPortResource.java | 80 +++--- .../apache/nifi/web/api/OutputPortResource.java | 16 +- .../nifi/web/api/ProcessGroupResource.java | 86 +++--- .../web/api/RemoteProcessGroupResource.java | 16 +- .../org/apache/nifi/web/api/dto/DtoFactory.java | 16 +- .../src/main/webapp/js/nf/canvas/nf-canvas.js | 2 +- .../webapp/js/nf/summary/nf-summary-table.js | 197 +++++++------- 19 files changed, 624 insertions(+), 306 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java index eb0339f..db16954 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java @@ -32,7 +32,6 @@ public class ProcessGroupStatus implements Cloneable { private Long inputContentSize; private Integer outputCount; private Long outputContentSize; - private long creationTimestamp; private Integer activeThreadCount; private Integer queuedCount; private Long queuedContentSize; @@ -132,14 +131,6 @@ public class ProcessGroupStatus implements Cloneable { this.queuedContentSize = queuedContentSize; } - public long getCreationTimestamp() { - return creationTimestamp; - } - - public void setCreationTimestamp(final long creationTimestamp) { - this.creationTimestamp = creationTimestamp; - } - public Integer getActiveThreadCount() { return activeThreadCount; } @@ -249,7 +240,6 @@ public class ProcessGroupStatus implements Cloneable { final ProcessGroupStatus clonedObj = new ProcessGroupStatus(); - clonedObj.creationTimestamp = creationTimestamp; clonedObj.id = id; clonedObj.name = name; clonedObj.outputContentSize = outputContentSize; @@ -332,8 +322,6 @@ public class ProcessGroupStatus implements Cloneable { builder.append(outputCount); builder.append(", outputBytes="); builder.append(outputContentSize); - builder.append(", creationTimestamp="); - builder.append(creationTimestamp); builder.append(", activeThreadCount="); builder.append(activeThreadCount); builder.append(", flowFilesTransferred="); http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusDTO.java index 371dd05..84ec774 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusDTO.java @@ -18,24 +18,28 @@ package org.apache.nifi.web.api.dto.status; import java.util.ArrayList; +import java.util.Date; import java.util.List; import javax.xml.bind.annotation.XmlType; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; import com.wordnik.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.util.TimeAdapter; @XmlType(name = "connectionStatus") public class ConnectionStatusDTO implements Cloneable { private String id; private String groupId; private String name; + private Date statsLastRefreshed; private String sourceId; private String sourceName; private String destinationId; private String destinationName; - private ConnectionStatusSnapshotDTO aggregateSnapshot; + private ConnectionStatusSnapshotDTO aggregateStatus; private List<NodeConnectionStatusSnapshotDTO> nodeStatuses; @ApiModelProperty("The ID of the connection") @@ -102,12 +106,12 @@ public class ConnectionStatusDTO implements Cloneable { } @ApiModelProperty("The status snapshot that represents the aggregate stats of the cluster") - public ConnectionStatusSnapshotDTO getAggregateSnapshot() { - return aggregateSnapshot; + public ConnectionStatusSnapshotDTO getAggregateStatus() { + return aggregateStatus; } - public void setAggregateSnapshot(ConnectionStatusSnapshotDTO aggregateSnapshot) { - this.aggregateSnapshot = aggregateSnapshot; + public void setAggregateStatus(ConnectionStatusSnapshotDTO aggregateStatus) { + this.aggregateStatus = aggregateStatus; } @ApiModelProperty("A list of status snapshots for each node") @@ -119,6 +123,16 @@ public class ConnectionStatusDTO implements Cloneable { this.nodeStatuses = nodeStatuses; } + @XmlJavaTypeAdapter(TimeAdapter.class) + @ApiModelProperty("The timestamp of when the stats were last refreshed") + public Date getStatsLastRefreshed() { + return statsLastRefreshed; + } + + public void setStatsLastRefreshed(Date statsLastRefreshed) { + this.statsLastRefreshed = statsLastRefreshed; + } + @Override public ConnectionStatusDTO clone() { final ConnectionStatusDTO other = new ConnectionStatusDTO(); @@ -129,7 +143,7 @@ public class ConnectionStatusDTO implements Cloneable { other.setName(getName()); other.setSourceId(getSourceId()); other.setSourceName(getSourceName()); - other.setAggregateSnapshot(getAggregateSnapshot().clone()); + other.setAggregateStatus(getAggregateStatus().clone()); final List<NodeConnectionStatusSnapshotDTO> nodeStatuses = getNodeStatuses(); final List<NodeConnectionStatusSnapshotDTO> nodeStatusClones = new ArrayList<>(nodeStatuses.size()); http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/PortStatusDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/PortStatusDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/PortStatusDTO.java index 4fc9c60..7e0ded5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/PortStatusDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/PortStatusDTO.java @@ -17,11 +17,14 @@ package org.apache.nifi.web.api.dto.status; +import java.util.Date; import java.util.List; import javax.xml.bind.annotation.XmlType; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; import com.wordnik.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.util.TimeAdapter; @XmlType(name = "portStatus") public class PortStatusDTO { @@ -30,6 +33,7 @@ public class PortStatusDTO { private String name; private Boolean transmitting; private String runStatus; + private Date statsLastRefreshed; private PortStatusSnapshotDTO aggregateStatus; private List<NodePortStatusSnapshotDTO> nodeStatuses; @@ -102,4 +106,21 @@ public class PortStatusDTO { public void setNodeStatuses(List<NodePortStatusSnapshotDTO> nodeStatuses) { this.nodeStatuses = nodeStatuses; } + + /** + * When the status for this process group was calculated. + * + * @return The the status was calculated + */ + @XmlJavaTypeAdapter(TimeAdapter.class) + @ApiModelProperty( + value = "The time the status for the process group was last refreshed." + ) + public Date getStatsLastRefreshed() { + return statsLastRefreshed; + } + + public void setStatsLastRefreshed(Date statsLastRefreshed) { + this.statsLastRefreshed = statsLastRefreshed; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusDTO.java index d9286e7..c493fc4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusDTO.java @@ -17,16 +17,20 @@ package org.apache.nifi.web.api.dto.status; +import java.util.Date; import java.util.List; import javax.xml.bind.annotation.XmlType; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; import com.wordnik.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.util.TimeAdapter; @XmlType(name = "processGroupStatus") public class ProcessGroupStatusDTO implements Cloneable { private String id; private String name; + private Date statsLastRefreshed; private ProcessGroupStatusSnapshotDTO aggregateStatus; private List<NodeProcessGroupStatusSnapshotDTO> nodeStatuses; @@ -67,4 +71,21 @@ public class ProcessGroupStatusDTO implements Cloneable { public void setNodeStatuses(List<NodeProcessGroupStatusSnapshotDTO> nodeStatuses) { this.nodeStatuses = nodeStatuses; } + + /** + * When the status for this process group was calculated. + * + * @return The the status was calculated + */ + @XmlJavaTypeAdapter(TimeAdapter.class) + @ApiModelProperty( + value = "The time the status for the process group was last refreshed." + ) + public Date getStatsLastRefreshed() { + return statsLastRefreshed; + } + + public void setStatsLastRefreshed(Date statsLastRefreshed) { + this.statsLastRefreshed = statsLastRefreshed; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java index cb7efdf..0190bc3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusSnapshotDTO.java @@ -75,7 +75,6 @@ public class ProcessGroupStatusSnapshotDTO implements Cloneable { private String sent; private Integer activeThreadCount = 0; - private Date statsLastRefreshed; /** * The id for the process group. @@ -433,24 +432,6 @@ public class ProcessGroupStatusSnapshotDTO implements Cloneable { this.flowFilesReceived = flowFilesReceived; } - /** - * When the status for this process group was calculated. - * - * @return The the status was calculated - */ - @XmlJavaTypeAdapter(TimeAdapter.class) - @ApiModelProperty( - value = "The time the status for the process group was last refreshed." - ) - public Date getStatsLastRefreshed() { - return statsLastRefreshed; - } - - public void setStatsLastRefreshed(Date statsLastRefreshed) { - this.statsLastRefreshed = statsLastRefreshed; - } - - public void setInput(String input) { this.input = input; } @@ -528,7 +509,6 @@ public class ProcessGroupStatusSnapshotDTO implements Cloneable { other.setSent(getSent()); other.setActiveThreadCount(getActiveThreadCount()); - other.setStatsLastRefreshed(getStatsLastRefreshed()); other.setConnectionStatusSnapshots(copy(getConnectionStatusSnapshots())); other.setProcessorStatusSnapshots(copy(getProcessorStatusSnapshots())); http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusDTO.java index 1829f73..01d1cdd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusDTO.java @@ -22,8 +22,10 @@ import java.util.Date; import java.util.List; import javax.xml.bind.annotation.XmlType; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; import com.wordnik.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.util.TimeAdapter; /** * DTO for serializing the status of a processor. @@ -32,9 +34,9 @@ import com.wordnik.swagger.annotations.ApiModelProperty; public class ProcessorStatusDTO implements Cloneable { private String groupId; private String id; - private String processorName; - private String processorType; - private String processorRunStatus; + private String name; + private String type; + private String runStatus; private Date statsLastRefreshed; private ProcessorStatusSnapshotDTO aggregateStatus; @@ -59,32 +61,33 @@ public class ProcessorStatusDTO implements Cloneable { } @ApiModelProperty("The name of the Processor") - public String getProcessorName() { - return processorName; + public String getName() { + return name; } - public void setProcessorName(String processorName) { - this.processorName = processorName; + public void setName(String name) { + this.name = name; } @ApiModelProperty("The type of the Processor") - public String getProcessorType() { - return processorType; + public String getType() { + return type; } - public void setProcessorType(String processorType) { - this.processorType = processorType; + public void setType(String type) { + this.type = type; } - @ApiModelProperty("") - public String getProcessorRunStatus() { - return processorRunStatus; + @ApiModelProperty("The run status of the Processor") + public String getRunStatus() { + return runStatus; } - public void setProcessorRunStatus(String processorRunStatus) { - this.processorRunStatus = processorRunStatus; + public void setRunStatus(String runStatus) { + this.runStatus = runStatus; } + @XmlJavaTypeAdapter(TimeAdapter.class) @ApiModelProperty("The timestamp of when the stats were last refreshed") public Date getStatsLastRefreshed() { return statsLastRefreshed; @@ -119,17 +122,18 @@ public class ProcessorStatusDTO implements Cloneable { final ProcessorStatusDTO other = new ProcessorStatusDTO(); other.setGroupId(getGroupId()); other.setId(getId()); - other.setProcessorName(getProcessorName()); - other.setProcessorRunStatus(getProcessorRunStatus()); - other.setProcessorType(getProcessorType()); + other.setName(getName()); + other.setRunStatus(getRunStatus()); + other.setType(getType()); other.setStatsLastRefreshed(getStatsLastRefreshed()); other.setAggregateStatus(getAggregateStatus().clone()); final List<NodeProcessorStatusSnapshotDTO> nodeStatuses = getNodeStatuses(); - final List<NodeProcessorStatusSnapshotDTO> nodeSnapshots = new ArrayList<>(nodeStatuses.size()); + final List<NodeProcessorStatusSnapshotDTO> nodeStatusClones = new ArrayList<>(nodeStatuses.size()); for (final NodeProcessorStatusSnapshotDTO status : nodeStatuses) { - nodeSnapshots.add(status.clone()); + nodeStatusClones.add(status.clone()); } + other.setNodeStatuses(nodeStatusClones); return other; } http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java index d45425d..032fb57 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessorStatusSnapshotDTO.java @@ -46,7 +46,7 @@ public class ProcessorStatusSnapshotDTO implements Cloneable { private String output; private Integer taskCount = 0; - private Long taskDurationNanos = 0L; + private Long tasksDurationNanos = 0L; private String tasks; private String tasksDuration; private Integer activeThreadCount = 0; @@ -263,12 +263,12 @@ public class ProcessorStatusSnapshotDTO implements Cloneable { } @ApiModelProperty("The number of nanoseconds that this Processor has spent running in the last 5 minutes") - public Long getTaskDuration() { - return taskDurationNanos; + public Long getTasksDurationNanos() { + return tasksDurationNanos; } - public void setTaskDuration(Long taskNanos) { - this.taskDurationNanos = taskNanos; + public void setTasksDurationNanos(Long taskNanos) { + this.tasksDurationNanos = taskNanos; } @Override @@ -287,7 +287,8 @@ public class ProcessorStatusSnapshotDTO implements Cloneable { other.setFlowFilesOut(getFlowFilesOut()); other.setBytesOut(getBytesOut()); other.setTaskCount(getTaskCount()); - other.setTaskDuration(getTaskDuration()); + other.setTasksDuration(getTasksDuration()); + other.setTasksDurationNanos(getTasksDurationNanos()); other.setActiveThreadCount(getActiveThreadCount()); other.setInput(getInput()); other.setOutput(getOutput()); http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/RemoteProcessGroupStatusDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/RemoteProcessGroupStatusDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/RemoteProcessGroupStatusDTO.java index 6038a70..64ba62f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/RemoteProcessGroupStatusDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/RemoteProcessGroupStatusDTO.java @@ -17,11 +17,14 @@ package org.apache.nifi.web.api.dto.status; +import java.util.Date; import java.util.List; import javax.xml.bind.annotation.XmlType; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; import com.wordnik.swagger.annotations.ApiModelProperty; +import org.apache.nifi.web.api.dto.util.TimeAdapter; @XmlType(name = "remoteProcessGroupStatus") public class RemoteProcessGroupStatusDTO { @@ -30,6 +33,7 @@ public class RemoteProcessGroupStatusDTO { private String name; private String targetUri; private String transmissionStatus; + private Date statsLastRefreshed; private List<String> authorizationIssues; @@ -111,4 +115,20 @@ public class RemoteProcessGroupStatusDTO { this.nodeStatuses = nodeStatuses; } + /** + * When the status for this process group was calculated. + * + * @return The the status was calculated + */ + @XmlJavaTypeAdapter(TimeAdapter.class) + @ApiModelProperty( + value = "The time the status for the process group was last refreshed." + ) + public Date getStatsLastRefreshed() { + return statsLastRefreshed; + } + + public void setStatsLastRefreshed(Date statsLastRefreshed) { + this.statsLastRefreshed = statsLastRefreshed; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java index 0237e62..dd2a2f1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusMerger.java @@ -237,9 +237,9 @@ public class StatusMerger { /** * Updates the fields that are "pretty printed" based on the raw values currently set. For example, - * {@link ProcessGroupStatusDTO#setInput(String)} will be called with the pretty-printed form of the - * FlowFile counts and sizes retrieved via {@link ProcessGroupStatusDTO#getFlowFilesIn()} and - * {@link ProcessGroupStatusDTO#getBytesIn()}. + * {@link ProcessGroupStatusSnapshotDTO#setInput(String)} will be called with the pretty-printed form of the + * FlowFile counts and sizes retrieved via {@link ProcessGroupStatusSnapshotDTO#getFlowFilesIn()} and + * {@link ProcessGroupStatusSnapshotDTO#getBytesIn()}. * * This logic is performed here, rather than in the DTO itself because the DTO needs to be kept purely * getters & setters - otherwise the automatic marshalling and unmarshalling to/from JSON becomes very @@ -260,6 +260,47 @@ public class StatusMerger { target.setSent(prettyPrint(target.getFlowFilesSent(), target.getBytesSent())); } + public static void merge(final RemoteProcessGroupStatusDTO target, final RemoteProcessGroupStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + merge(target.getAggregateStatus(), toMerge.getAggregateStatus()); + + if (target.getNodeStatuses() != null) { + final NodeRemoteProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeRemoteProcessGroupStatusSnapshotDTO(); + nodeSnapshot.setStatusSnapshot(toMerge.getAggregateStatus()); + nodeSnapshot.setAddress(nodeAddress); + nodeSnapshot.setApiPort(nodeApiPort); + nodeSnapshot.setNodeId(nodeId); + + target.getNodeStatuses().add(nodeSnapshot); + } + } + + public static void merge(final PortStatusDTO target, final PortStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + merge(target.getAggregateStatus(), toMerge.getAggregateStatus()); + + if (target.getNodeStatuses() != null) { + final NodePortStatusSnapshotDTO nodeSnapshot = new NodePortStatusSnapshotDTO(); + nodeSnapshot.setStatusSnapshot(toMerge.getAggregateStatus()); + nodeSnapshot.setAddress(nodeAddress); + nodeSnapshot.setApiPort(nodeApiPort); + nodeSnapshot.setNodeId(nodeId); + + target.getNodeStatuses().add(nodeSnapshot); + } + } + + public static void merge(final ConnectionStatusDTO target, final ConnectionStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + merge(target.getAggregateStatus(), toMerge.getAggregateStatus()); + + if (target.getNodeStatuses() != null) { + final NodeConnectionStatusSnapshotDTO nodeSnapshot = new NodeConnectionStatusSnapshotDTO(); + nodeSnapshot.setStatusSnapshot(toMerge.getAggregateStatus()); + nodeSnapshot.setAddress(nodeAddress); + nodeSnapshot.setApiPort(nodeApiPort); + nodeSnapshot.setNodeId(nodeId); + + target.getNodeStatuses().add(nodeSnapshot); + } + } public static void merge(final ProcessorStatusDTO target, final ProcessorStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { merge(target.getAggregateStatus(), toMerge.getAggregateStatus()); @@ -296,7 +337,7 @@ public class StatusMerger { target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut()); target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut()); target.setTaskCount(target.getTaskCount() + toMerge.getTaskCount()); - target.setTaskDuration(target.getTaskDuration() + toMerge.getTaskDuration()); + target.setTasksDurationNanos(target.getTasksDurationNanos() + toMerge.getTasksDurationNanos()); target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); updatePrettyPrintedFields(target); } @@ -311,7 +352,7 @@ public class StatusMerger { final String tasks = (taskCount == null) ? "-" : formatCount(taskCount); target.setTasks(tasks); - target.setTasksDuration(FormatUtils.formatHoursMinutesSeconds(target.getTaskDuration(), TimeUnit.NANOSECONDS)); + target.setTasksDuration(FormatUtils.formatHoursMinutesSeconds(target.getTasksDurationNanos(), TimeUnit.NANOSECONDS)); } http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 9f5b3fc..8e4ca73 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -219,18 +219,26 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO; +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; +import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.NodePortStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO; +import org.apache.nifi.web.api.dto.status.PortStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.dto.status.StatusMerger; import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; import org.apache.nifi.web.api.entity.BulletinBoardEntity; import org.apache.nifi.web.api.entity.ComponentStateEntity; +import org.apache.nifi.web.api.entity.ConnectionStatusEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity; @@ -239,6 +247,7 @@ import org.apache.nifi.web.api.entity.CountersEntity; import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.FlowSnippetEntity; import org.apache.nifi.web.api.entity.ListingRequestEntity; +import org.apache.nifi.web.api.entity.PortStatusEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; @@ -247,6 +256,7 @@ import org.apache.nifi.web.api.entity.ProcessorsEntity; import org.apache.nifi.web.api.entity.ProvenanceEntity; import org.apache.nifi.web.api.entity.ProvenanceEventEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; import org.apache.nifi.web.api.entity.ReportingTaskEntity; import org.apache.nifi.web.api.entity.ReportingTasksEntity; @@ -359,6 +369,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final Pattern CONNECTION_STATUS_HISTORY_URI_PATTERN = Pattern .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/status/history"); + public static final Pattern CONNECTION_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/status"); + public static final Pattern INPUT_PORT_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/input-ports/[a-f0-9\\-]{36}/status"); + public static final Pattern OUTPUT_PORT_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/output-ports/[a-f0-9\\-]{36}/status"); + public static final Pattern REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}/status"); @Deprecated public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents"); @@ -2428,6 +2442,22 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return ("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches(); } + private static boolean isConnectionStatusEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && CONNECTION_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isInputPortStatusEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && INPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isOutputPortStatusEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && OUTPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + private static boolean isRemoteProcessGroupStatusEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + private static boolean isGroupStatusEndpoint(final URI uri, final String method) { return "GET".equalsIgnoreCase(method) && GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); } @@ -2581,6 +2611,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, method) || isDropRequestEndpoint(uri, method) || isListFlowFilesEndpoint(uri, method) || isGroupStatusEndpoint(uri, method) || isProcessorStatusEndpoint(uri, method) || isControllerStatusEndpoint(uri, method) + || isConnectionStatusEndpoint(uri, method) || isRemoteProcessGroupStatusEndpoint(uri, method) + || isInputPortStatusEndpoint(uri, method) || isOutputPortStatusEndpoint(uri, method) || isProcessorStatusHistoryEndpoint(uri, method) || isProcessGroupStatusHistoryEndpoint(uri, method) || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || isConnectionStatusHistoryEndpoint(uri, method) || isBulletinBoardEndpoint(uri, method) || isSystemDiagnosticsEndpoint(uri, method) @@ -2655,8 +2687,19 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } } - private void mergeGroupStatus(final ProcessGroupStatusDTO statusDto, final Map<NodeIdentifier, ProcessGroupStatusDTO> resultMap) { + private void mergeGroupStatus(final ProcessGroupStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ProcessGroupStatusDTO> resultMap) { ProcessGroupStatusDTO mergedProcessGroupStatus = statusDto; + mergedProcessGroupStatus.setNodeStatuses(new ArrayList<NodeProcessGroupStatusSnapshotDTO>()); + + final NodeProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new NodeProcessGroupStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateStatus().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + + mergedProcessGroupStatus.getNodeStatuses().add(selectedNodeSnapshot); + for (final Map.Entry<NodeIdentifier, ProcessGroupStatusDTO> entry : resultMap.entrySet()) { final NodeIdentifier nodeId = entry.getKey(); final ProcessGroupStatusDTO nodeProcessGroupStatus = entry.getValue(); @@ -2685,13 +2728,13 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final ProcessorStatusDTO mergedProcessorStatus = statusDto; mergedProcessorStatus.setNodeStatuses(new ArrayList<NodeProcessorStatusSnapshotDTO>()); - final NodeProcessorStatusSnapshotDTO nodeSnapshot = new NodeProcessorStatusSnapshotDTO(); - nodeSnapshot.setStatusSnapshot(statusDto.getAggregateStatus().clone()); - nodeSnapshot.setAddress(selectedNodeId.getApiAddress()); - nodeSnapshot.setApiPort(selectedNodeId.getApiPort()); - nodeSnapshot.setNodeId(selectedNodeId.getId()); + final NodeProcessorStatusSnapshotDTO selectedNodeSnapshot = new NodeProcessorStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateStatus().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); - mergedProcessorStatus.getNodeStatuses().add(nodeSnapshot); + mergedProcessorStatus.getNodeStatuses().add(selectedNodeSnapshot); // merge the other nodes for (final Map.Entry<NodeIdentifier, ProcessorStatusDTO> entry : resultMap.entrySet()) { @@ -2705,6 +2748,77 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } } + private void mergeConnectionStatus(final ConnectionStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ConnectionStatusDTO> resultMap) { + final ConnectionStatusDTO mergedConnectionStatus = statusDto; + mergedConnectionStatus.setNodeStatuses(new ArrayList<NodeConnectionStatusSnapshotDTO>()); + + final NodeConnectionStatusSnapshotDTO selectedNodeSnapshot = new NodeConnectionStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateStatus().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedConnectionStatus.getNodeStatuses().add(selectedNodeSnapshot); + + // merge the other nodes + for (final Map.Entry<NodeIdentifier, ConnectionStatusDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ConnectionStatusDTO nodeConnectionStatus = entry.getValue(); + if (nodeConnectionStatus == statusDto) { + continue; + } + + StatusMerger.merge(mergedConnectionStatus, nodeConnectionStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + + private void mergePortStatus(final PortStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, PortStatusDTO> resultMap) { + final PortStatusDTO mergedPortStatus = statusDto; + mergedPortStatus.setNodeStatuses(new ArrayList<NodePortStatusSnapshotDTO>()); + + final NodePortStatusSnapshotDTO selectedNodeSnapshot = new NodePortStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateStatus().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedPortStatus.getNodeStatuses().add(selectedNodeSnapshot); + + // merge the other nodes + for (final Map.Entry<NodeIdentifier, PortStatusDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final PortStatusDTO nodePortStatus = entry.getValue(); + if (nodePortStatus == statusDto) { + continue; + } + + StatusMerger.merge(mergedPortStatus, nodePortStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + + private void mergeRemoteProcessGroupStatus(final RemoteProcessGroupStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, RemoteProcessGroupStatusDTO> resultMap) { + final RemoteProcessGroupStatusDTO mergedRemoteProcessGroupStatus = statusDto; + mergedRemoteProcessGroupStatus.setNodeStatuses(new ArrayList<NodeRemoteProcessGroupStatusSnapshotDTO>()); + + final NodeRemoteProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new NodeRemoteProcessGroupStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateStatus().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedRemoteProcessGroupStatus.getNodeStatuses().add(selectedNodeSnapshot); + + // merge the other nodes + for (final Map.Entry<NodeIdentifier, RemoteProcessGroupStatusDTO> entry : resultMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final RemoteProcessGroupStatusDTO nodeRemoteProcessGroupStatus = entry.getValue(); + if (nodeRemoteProcessGroupStatus == statusDto) { + continue; + } + + StatusMerger.merge(mergedRemoteProcessGroupStatus, nodeRemoteProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } private void mergeControllerStatus(final ControllerStatusDTO statusDto, final Map<NodeIdentifier, ControllerStatusDTO> resultMap) { ControllerStatusDTO mergedStatus = statusDto; @@ -3386,32 +3500,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C // create a new client response clientResponse = new NodeResponse(clientResponse, responseEntity); - } else if (hasSuccessfulClientResponse && isProcessorStatusEndpoint(uri, method)) { - final ProcessorStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessorStatusEntity.class); - final ProcessorStatusDTO statusRequest = responseEntity.getProcessorStatus(); - - NodeIdentifier nodeIdentifier = null; - - final Map<NodeIdentifier, ProcessorStatusDTO> resultsMap = new HashMap<>(); - for (final NodeResponse nodeResponse : updatedNodesMap.values()) { - if (problematicNodeResponses.contains(nodeResponse)) { - continue; - } - - final ProcessorStatusEntity nodeResponseEntity; - if (nodeResponse == clientResponse) { - nodeIdentifier = nodeResponse.getNodeId(); - nodeResponseEntity = responseEntity; - } else { - nodeResponseEntity = nodeResponse.getClientResponse().getEntity(ProcessorStatusEntity.class); - } - - final ProcessorStatusDTO nodeStatus = nodeResponseEntity.getProcessorStatus(); - resultsMap.put(nodeResponse.getNodeId(), nodeStatus); - } - mergeProcessorStatus(statusRequest, nodeIdentifier, resultsMap); - - clientResponse = new NodeResponse(clientResponse, responseEntity); } else if (hasSuccessfulClientResponse && isProcessGroupEndpoint(uri, method)) { final ProcessGroupEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessGroupEntity.class); final ProcessGroupDTO responseDto = responseEntity.getProcessGroup(); @@ -3791,18 +3879,130 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final ProcessGroupStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessGroupStatusEntity.class); final ProcessGroupStatusDTO statusRequest = responseEntity.getProcessGroupStatus(); + NodeIdentifier nodeIdentifier = null; + final Map<NodeIdentifier, ProcessGroupStatusDTO> resultsMap = new HashMap<>(); for (final NodeResponse nodeResponse : updatedNodesMap.values()) { if (problematicNodeResponses.contains(nodeResponse)) { continue; } - final ProcessGroupStatusEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupStatusEntity.class); + final ProcessGroupStatusEntity nodeResponseEntity; + if (nodeResponse == clientResponse) { + nodeIdentifier = nodeResponse.getNodeId(); + nodeResponseEntity = responseEntity; + } else { + nodeResponseEntity = nodeResponse.getClientResponse().getEntity(ProcessGroupStatusEntity.class); + } + final ProcessGroupStatusDTO nodeStatus = nodeResponseEntity.getProcessGroupStatus(); + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeGroupStatus(statusRequest, nodeIdentifier, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isProcessorStatusEndpoint(uri, method)) { + final ProcessorStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessorStatusEntity.class); + final ProcessorStatusDTO statusRequest = responseEntity.getProcessorStatus(); + + NodeIdentifier nodeIdentifier = null; + + final Map<NodeIdentifier, ProcessorStatusDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ProcessorStatusEntity nodeResponseEntity; + if (nodeResponse == clientResponse) { + nodeIdentifier = nodeResponse.getNodeId(); + nodeResponseEntity = responseEntity; + } else { + nodeResponseEntity = nodeResponse.getClientResponse().getEntity(ProcessorStatusEntity.class); + } + + final ProcessorStatusDTO nodeStatus = nodeResponseEntity.getProcessorStatus(); + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeProcessorStatus(statusRequest, nodeIdentifier, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isConnectionStatusEndpoint(uri, method)) { + final ConnectionStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ConnectionStatusEntity.class); + final ConnectionStatusDTO statusRequest = responseEntity.getConnectionStatus(); + + NodeIdentifier nodeIdentifier = null; + + final Map<NodeIdentifier, ConnectionStatusDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ConnectionStatusEntity nodeResponseEntity; + if (nodeResponse == clientResponse) { + nodeIdentifier = nodeResponse.getNodeId(); + nodeResponseEntity = responseEntity; + } else { + nodeResponseEntity = nodeResponse.getClientResponse().getEntity(ConnectionStatusEntity.class); + } + + final ConnectionStatusDTO nodeStatus = nodeResponseEntity.getConnectionStatus(); + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergeConnectionStatus(statusRequest, nodeIdentifier, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && (isInputPortStatusEndpoint(uri, method) || isOutputPortStatusEndpoint(uri, method))) { + final PortStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(PortStatusEntity.class); + final PortStatusDTO statusRequest = responseEntity.getPortStatus(); + + NodeIdentifier nodeIdentifier = null; + + final Map<NodeIdentifier, PortStatusDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final PortStatusEntity nodeResponseEntity; + if (nodeResponse == clientResponse) { + nodeIdentifier = nodeResponse.getNodeId(); + nodeResponseEntity = responseEntity; + } else { + nodeResponseEntity = nodeResponse.getClientResponse().getEntity(PortStatusEntity.class); + } + + final PortStatusDTO nodeStatus = nodeResponseEntity.getPortStatus(); + resultsMap.put(nodeResponse.getNodeId(), nodeStatus); + } + mergePortStatus(statusRequest, nodeIdentifier, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isRemoteProcessGroupStatusEndpoint(uri, method)) { + final RemoteProcessGroupStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupStatusEntity.class); + final RemoteProcessGroupStatusDTO statusRequest = responseEntity.getRemoteProcessGroupStatus(); + + NodeIdentifier nodeIdentifier = null; + + final Map<NodeIdentifier, RemoteProcessGroupStatusDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final RemoteProcessGroupStatusEntity nodeResponseEntity; + if (nodeResponse == clientResponse) { + nodeIdentifier = nodeResponse.getNodeId(); + nodeResponseEntity = responseEntity; + } else { + nodeResponseEntity = nodeResponse.getClientResponse().getEntity(RemoteProcessGroupStatusEntity.class); + } + final RemoteProcessGroupStatusDTO nodeStatus = nodeResponseEntity.getRemoteProcessGroupStatus(); resultsMap.put(nodeResponse.getNodeId(), nodeStatus); } - mergeGroupStatus(statusRequest, resultsMap); + mergeRemoteProcessGroupStatus(statusRequest, nodeIdentifier, resultsMap); clientResponse = new NodeResponse(clientResponse, responseEntity); } else if (hasSuccessfulClientResponse && isControllerStatusEndpoint(uri, method)) { http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 01ffd2a..ce83baf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2112,7 +2112,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final ProcessGroupStatus status = new ProcessGroupStatus(); status.setId(group.getIdentifier()); status.setName(group.getName()); - status.setCreationTimestamp(new Date().getTime()); int activeGroupThreads = 0; long bytesRead = 0L; long bytesWritten = 0L; http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java index dea0407..f2ce875 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java @@ -25,6 +25,7 @@ import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.cluster.context.ClusterContext; import org.apache.nifi.cluster.context.ClusterContextThreadLocal; +import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.node.Node; @@ -340,7 +341,15 @@ public class ConnectionResource extends ApplicationResource { if (properties.isClusterManager()) { // determine where this request should be sent if (clusterNodeId == null) { - return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()); + final ConnectionStatusEntity entity = (ConnectionStatusEntity) nodeResponse.getUpdatedEntity(); + + // ensure there is an updated entity (result of merging) and prune the response as necessary + if (entity != null && !nodewise) { + entity.getConnectionStatus().setNodeStatuses(null); + } + + return nodeResponse.getResponse(); } else { // get the target node and ensure it exists final Node targetNode = clusterManager.getNode(clusterNodeId); @@ -359,11 +368,6 @@ public class ConnectionResource extends ApplicationResource { // get the specified connection status final ConnectionStatusDTO connectionStatus = serviceFacade.getConnectionStatus(groupId, id); - // prune the response as necessary - if (!nodewise) { - connectionStatus.setNodeStatuses(null); - } - // create the revision final RevisionDTO revision = new RevisionDTO(); revision.setClientId(clientId.getClientId()); http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java index a200831..c90c547 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java @@ -16,34 +16,14 @@ */ package org.apache.nifi.web.api; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.FormParam; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.core.Response; - +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.node.Node; @@ -65,12 +45,31 @@ import org.apache.nifi.web.api.request.IntegerParameter; import org.apache.nifi.web.api.request.LongParameter; import org.springframework.security.access.prepost.PreAuthorize; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.FormParam; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; /** * RESTful endpoint for managing an Input Port. @@ -434,7 +433,15 @@ public class InputPortResource extends ApplicationResource { if (properties.isClusterManager()) { // determine where this request should be sent if (clusterNodeId == null) { - return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()); + final PortStatusEntity entity = (PortStatusEntity) nodeResponse.getUpdatedEntity(); + + // ensure there is an updated entity (result of merging) and prune the response as necessary + if (entity != null && !nodewise) { + entity.getPortStatus().setNodeStatuses(null); + } + + return nodeResponse.getResponse(); } else { // get the target node and ensure it exists final Node targetNode = clusterManager.getNode(clusterNodeId); @@ -453,11 +460,6 @@ public class InputPortResource extends ApplicationResource { // get the specified input port status final PortStatusDTO portStatus = serviceFacade.getInputPortStatus(groupId, id); - // prune the response as necessary - if (!nodewise) { - portStatus.setNodeStatuses(null); - } - // create the revision final RevisionDTO revision = new RevisionDTO(); revision.setClientId(clientId.getClientId()); http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java index 450b005..26f08c3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java @@ -48,6 +48,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; +import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.node.Node; @@ -436,7 +437,15 @@ public class OutputPortResource extends ApplicationResource { if (properties.isClusterManager()) { // determine where this request should be sent if (clusterNodeId == null) { - return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()); + final PortStatusEntity entity = (PortStatusEntity) nodeResponse.getUpdatedEntity(); + + // ensure there is an updated entity (result of merging) and prune the response as necessary + if (entity != null && !nodewise) { + entity.getPortStatus().setNodeStatuses(null); + } + + return nodeResponse.getResponse(); } else { // get the target node and ensure it exists final Node targetNode = clusterManager.getNode(clusterNodeId); @@ -455,11 +464,6 @@ public class OutputPortResource extends ApplicationResource { // get the specified output port status final PortStatusDTO portStatus = serviceFacade.getOutputPortStatus(groupId, id); - // prune the response as necessary - if (!nodewise) { - portStatus.setNodeStatuses(null); - } - // create the revision final RevisionDTO revision = new RevisionDTO(); revision.setClientId(clientId.getClientId()); http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 8b6c4b4..a42eaa0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -16,33 +16,15 @@ */ package org.apache.nifi.web.api; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.FormParam; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - +import com.sun.jersey.api.core.ResourceContext; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.node.Node; @@ -69,13 +51,30 @@ import org.apache.nifi.web.api.request.DoubleParameter; import org.apache.nifi.web.api.request.LongParameter; import org.springframework.security.access.prepost.PreAuthorize; -import com.sun.jersey.api.core.ResourceContext; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.FormParam; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; /** * RESTful endpoint for managing a Group. @@ -1317,7 +1316,15 @@ public class ProcessGroupResource extends ApplicationResource { if (properties.isClusterManager()) { // determine where this request should be sent if (clusterNodeId == null) { - return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()); + final ProcessGroupStatusEntity entity = (ProcessGroupStatusEntity) nodeResponse.getUpdatedEntity(); + + // ensure there is an updated entity (result of merging) and prune the response as necessary + if (entity != null && !nodewise) { + entity.getProcessGroupStatus().setNodeStatuses(null); + } + + return nodeResponse.getResponse(); } else { // get the target node and ensure it exists final Node targetNode = clusterManager.getNode(clusterNodeId); @@ -1337,16 +1344,11 @@ public class ProcessGroupResource extends ApplicationResource { final ProcessGroupStatusDTO statusReport = serviceFacade.getProcessGroupStatus(groupId); // prune the response as necessary - if (!nodewise) { - statusReport.setNodeStatuses(null); - } - - // prune the response as necessary if (!recursive) { - prune(statusReport.getAggregateStatus()); + pruneChildGroups(statusReport.getAggregateStatus()); if (statusReport.getNodeStatuses() != null) { for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : statusReport.getNodeStatuses()) { - prune(nodeSnapshot.getStatusSnapshot()); + pruneChildGroups(nodeSnapshot.getStatusSnapshot()); } } } @@ -1364,7 +1366,7 @@ public class ProcessGroupResource extends ApplicationResource { return clusterContext(generateOkResponse(entity)).build(); } - private void prune(final ProcessGroupStatusSnapshotDTO snapshot) { + private void pruneChildGroups(final ProcessGroupStatusSnapshotDTO snapshot) { for (final ProcessGroupStatusSnapshotDTO childProcessGroupStatus : snapshot.getProcessGroupStatusSnapshots()) { childProcessGroupStatus.setConnectionStatusSnapshots(null); childProcessGroupStatus.setProcessGroupStatusSnapshots(null); http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java index 19b29c4..285522a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java @@ -23,6 +23,7 @@ import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.node.Node; @@ -317,7 +318,15 @@ public class RemoteProcessGroupResource extends ApplicationResource { if (properties.isClusterManager()) { // determine where this request should be sent if (clusterNodeId == null) { - return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()); + final RemoteProcessGroupStatusEntity entity = (RemoteProcessGroupStatusEntity) nodeResponse.getUpdatedEntity(); + + // ensure there is an updated entity (result of merging) and prune the response as necessary + if (entity != null && !nodewise) { + entity.getRemoteProcessGroupStatus().setNodeStatuses(null); + } + + return nodeResponse.getResponse(); } else { // get the target node and ensure it exists final Node targetNode = clusterManager.getNode(clusterNodeId); @@ -336,11 +345,6 @@ public class RemoteProcessGroupResource extends ApplicationResource { // get the specified remote process group status final RemoteProcessGroupStatusDTO remoteProcessGroupStatus = serviceFacade.getRemoteProcessGroupStatus(groupId, id); - // prune the response as necessary - if (!nodewise) { - remoteProcessGroupStatus.setNodeStatuses(null); - } - // create the revision final RevisionDTO revision = new RevisionDTO(); revision.setClientId(clientId.getClientId()); http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 5c6ea7f..facc889 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -719,12 +719,14 @@ public final class DtoFactory { dto.setTargetUri(remoteProcessGroupStatus.getTargetUri()); dto.setName(remoteProcessGroupStatus.getName()); dto.setTransmissionStatus(remoteProcessGroupStatus.getTransmissionStatus().toString()); + dto.setStatsLastRefreshed(new Date()); final RemoteProcessGroupStatusSnapshotDTO snapshot = new RemoteProcessGroupStatusSnapshotDTO(); dto.setAggregateStatus(snapshot); snapshot.setId(remoteProcessGroupStatus.getId()); snapshot.setGroupId(remoteProcessGroupStatus.getGroupId()); + snapshot.setName(remoteProcessGroupStatus.getName()); snapshot.setTargetUri(remoteProcessGroupStatus.getTargetUri()); snapshot.setTransmissionStatus(remoteProcessGroupStatus.getTransmissionStatus().toString()); @@ -743,6 +745,7 @@ public final class DtoFactory { final ProcessGroupStatusDTO processGroupStatusDto = new ProcessGroupStatusDTO(); processGroupStatusDto.setId(processGroupStatus.getId()); processGroupStatusDto.setName(processGroupStatus.getName()); + processGroupStatusDto.setStatsLastRefreshed(new Date()); final ProcessGroupStatusSnapshotDTO snapshot = new ProcessGroupStatusSnapshotDTO(); processGroupStatusDto.setAggregateStatus(snapshot); @@ -750,7 +753,6 @@ public final class DtoFactory { snapshot.setId(processGroupStatus.getId()); snapshot.setName(processGroupStatus.getName()); - snapshot.setStatsLastRefreshed(new Date(processGroupStatus.getCreationTimestamp())); snapshot.setFlowFilesQueued(processGroupStatus.getQueuedCount()); snapshot.setBytesQueued(processGroupStatus.getQueuedContentSize()); snapshot.setBytesRead(processGroupStatus.getBytesRead()); @@ -786,7 +788,7 @@ public final class DtoFactory { if (connectionStatusCollection != null) { for (final ConnectionStatus connectionStatus : connectionStatusCollection) { final ConnectionStatusDTO connectionStatusDto = createConnectionStatusDto(connectionStatus); - connectionStatusDtoCollection.add(connectionStatusDto.getAggregateSnapshot()); + connectionStatusDtoCollection.add(connectionStatusDto.getAggregateStatus()); } } @@ -846,9 +848,10 @@ public final class DtoFactory { connectionStatusDto.setSourceName(connectionStatus.getSourceName()); connectionStatusDto.setDestinationId(connectionStatus.getDestinationId()); connectionStatusDto.setDestinationName(connectionStatus.getDestinationName()); + connectionStatusDto.setStatsLastRefreshed(new Date()); final ConnectionStatusSnapshotDTO snapshot = new ConnectionStatusSnapshotDTO(); - connectionStatusDto.setAggregateSnapshot(snapshot); + connectionStatusDto.setAggregateStatus(snapshot); snapshot.setId(connectionStatus.getId()); snapshot.setGroupId(connectionStatus.getGroupId()); @@ -873,7 +876,8 @@ public final class DtoFactory { final ProcessorStatusDTO dto = new ProcessorStatusDTO(); dto.setId(procStatus.getId()); dto.setGroupId(procStatus.getGroupId()); - dto.setProcessorName(procStatus.getName()); + dto.setName(procStatus.getName()); + dto.setStatsLastRefreshed(new Date()); final ProcessorStatusSnapshotDTO snapshot = new ProcessorStatusSnapshotDTO(); dto.setAggregateStatus(snapshot); @@ -892,7 +896,8 @@ public final class DtoFactory { snapshot.setBytesWritten(procStatus.getBytesWritten()); snapshot.setTaskCount(procStatus.getInvocations()); - snapshot.setTaskDuration(procStatus.getProcessingNanos()); + snapshot.setTasksDurationNanos(procStatus.getProcessingNanos()); + snapshot.setTasksDuration(FormatUtils.formatHoursMinutesSeconds(procStatus.getProcessingNanos(), TimeUnit.NANOSECONDS)); // determine the run status snapshot.setRunStatus(procStatus.getRunStatus().toString()); @@ -917,6 +922,7 @@ public final class DtoFactory { dto.setName(portStatus.getName()); dto.setRunStatus(portStatus.getRunStatus().toString()); dto.setTransmitting(portStatus.isTransmitting()); + dto.setStatsLastRefreshed(new Date()); final PortStatusSnapshotDTO snapshot = new PortStatusSnapshotDTO(); dto.setAggregateStatus(snapshot); http://git-wip-us.apache.org/repos/asf/nifi/blob/61a7cbea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js index 1935a71..41bc1f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js @@ -850,7 +850,7 @@ nf.Canvas = (function () { nf.Graph.setStatus(aggregateStatus); // update the timestamp - $('#stats-last-refreshed').text(aggregateStatus.statsLastRefreshed); + $('#stats-last-refreshed').text(processGroupStatus.statsLastRefreshed); } deferred.resolve(); }).fail(function (xhr, status, error) {
