http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatusEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatusEntity.java new file mode 100644 index 0000000..8b62331 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatusEntity.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ConnectionStatusDTO. + */ +@XmlRootElement(name = "connectionStatusEntity") +public class ConnectionStatusEntity extends Entity { + + private ConnectionStatusDTO connectionStatus; + + /** + * The ConnectionStatusDTO that is being serialized. + * + * @return The ConnectionStatusDTO object + */ + public ConnectionStatusDTO getConnectionStatus() { + return connectionStatus; + } + + public void setConnectionStatus(ConnectionStatusDTO connectionStatus) { + this.connectionStatus = connectionStatus; + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeSystemDiagnosticsEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeSystemDiagnosticsEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeSystemDiagnosticsEntity.java deleted file mode 100644 index 443276c..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/NodeSystemDiagnosticsEntity.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.web.api.entity; - -import javax.xml.bind.annotation.XmlRootElement; -import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO; - -/** - * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a NodeSystemDiagnosticsDTO. - */ -@XmlRootElement(name = "nodeSystemDiagnosticsEntity") -public class NodeSystemDiagnosticsEntity extends Entity { - - private NodeSystemDiagnosticsDTO nodeSystemDiagnostics; - - /** - * The NodeSystemDiagnosticsDTO that is being serialized. - * - * @return The NodeSystemDiagnosticsDTO object - */ - public NodeSystemDiagnosticsDTO getNodeSystemDiagnostics() { - return nodeSystemDiagnostics; - } - - public void setNodeSystemDiagnostics(NodeSystemDiagnosticsDTO nodeSystemDiagnostics) { - this.nodeSystemDiagnostics = nodeSystemDiagnostics; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortStatusEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortStatusEntity.java new file mode 100644 index 0000000..e0b49c4 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PortStatusEntity.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import org.apache.nifi.web.api.dto.status.PortStatusDTO; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a PortStatusDTO. + */ +@XmlRootElement(name = "portStatusEntity") +public class PortStatusEntity extends Entity { + + private PortStatusDTO portStatus; + + /** + * The PortStatusDTO that is being serialized. + * + * @return The PortStatusDTO object + */ + public PortStatusDTO getPortStatus() { + return portStatus; + } + + public void setPortStatus(PortStatusDTO portStatus) { + this.portStatus = portStatus; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorStatusEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorStatusEntity.java new file mode 100644 index 0000000..0c2170c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessorStatusEntity.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ProcessorStatusDTO. + */ +@XmlRootElement(name = "processorStatusEntity") +public class ProcessorStatusEntity extends Entity { + + private ProcessorStatusDTO processorStatus; + + /** + * The ProcessorStatusDTO that is being serialized. + * + * @return The ProcessorStatusDTO object + */ + public ProcessorStatusDTO getProcessorStatus() { + return processorStatus; + } + + public void setProcessorStatus(ProcessorStatusDTO processorStatus) { + this.processorStatus = processorStatus; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupStatusEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupStatusEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupStatusEntity.java new file mode 100644 index 0000000..a5031ab --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RemoteProcessGroupStatusEntity.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a RemoteProcessGroupStatusDTO. + */ +@XmlRootElement(name = "remoteProcessGroupStatusEntity") +public class RemoteProcessGroupStatusEntity extends Entity { + + private RemoteProcessGroupStatusDTO remoteProcessGroupStatus; + + /** + * The RemoteProcessGroupStatusDTO that is being serialized. + * + * @return The RemoteProcessGroupStatusDTO object + */ + public RemoteProcessGroupStatusDTO getRemoteProcessGroupStatus() { + return remoteProcessGroupStatus; + } + + public void setRemoteProcessGroupStatus(RemoteProcessGroupStatusDTO remoteProcessGroupStatus) { + this.remoteProcessGroupStatus = remoteProcessGroupStatus; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java index f3e5df4..be0c339 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java @@ -20,7 +20,6 @@ import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; /** @@ -52,15 +51,6 @@ public interface NodeProtocolSender { void heartbeat(HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException; /** - * Sends a bulletins message to the cluster manager. - * - * @param msg a message - * @throws ProtocolException pe - * @throws UnknownServiceAddressException ex - */ - void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException; - - /** * Sends a failure notification if the controller was unable start. * * @param msg a message http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java index 993dea5..9ae6182 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java @@ -32,7 +32,6 @@ import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; @@ -117,11 +116,6 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender { } @Override - public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException { - sendProtocolMessage(msg); - } - - @Override public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException { sendProtocolMessage(msg); } http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java index 2992e38..0a9a064 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java @@ -24,7 +24,6 @@ import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolHandler; import org.apache.nifi.cluster.protocol.ProtocolListener; import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; -import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; @@ -104,11 +103,6 @@ public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolL } @Override - public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException { - sender.sendBulletins(msg); - } - - @Override public void setBulletinRepository(final BulletinRepository bulletinRepository) { listener.setBulletinRepository(bulletinRepository); } http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java index f0a9fa7..516b67e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java @@ -26,7 +26,6 @@ import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage; -import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; import org.apache.nifi.cluster.protocol.message.PingMessage; import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; @@ -97,8 +96,4 @@ public class ObjectFactory { public PrimaryRoleAssignmentMessage createPrimaryRoleAssignmentMessage() { return new PrimaryRoleAssignmentMessage(); } - - public NodeBulletinsMessage createBulletinsMessage() { - return new NodeBulletinsMessage(); - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java deleted file mode 100644 index 6df3ba4..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.message; - -import org.apache.nifi.cluster.protocol.NodeBulletins; -import javax.xml.bind.annotation.XmlRootElement; - -/** - */ -@XmlRootElement(name = "nodeBulletinsMessage") -public class NodeBulletinsMessage extends ProtocolMessage { - - private NodeBulletins bulletins; - - @Override - public MessageType getType() { - return MessageType.BULLETINS; - } - - public NodeBulletins getBulletins() { - return bulletins; - } - - public void setBulletins(NodeBulletins bulletins) { - this.bulletins = bulletins; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java index c6f7ce0..f01efd8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java @@ -21,8 +21,6 @@ public abstract class ProtocolMessage { private volatile String requestorDN; public static enum MessageType { - - BULLETINS, CONNECTION_REQUEST, CONNECTION_RESPONSE, CONTROLLER_STARTUP_FAILURE, http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java index 336d675..51de54b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java @@ -33,8 +33,6 @@ import org.apache.nifi.cluster.protocol.ConnectionRequest; import org.apache.nifi.cluster.protocol.ConnectionResponse; import org.apache.nifi.cluster.protocol.Heartbeat; import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.diagnostics.SystemDiagnostics; import org.apache.nifi.remote.cluster.NodeInformant; import org.apache.nifi.reporting.BulletinRepository; @@ -168,15 +166,4 @@ public interface ClusterManager extends NodeInformant { * @return the bulletin repository */ BulletinRepository getBulletinRepository(); - - /** - * @param groupId groupId - * @return a {@link ProcessGroupStatus} that represents the status of all nodes with the given {@link Status}es for the given ProcessGroup id, or null if no nodes exist with the given statuses - */ - ProcessGroupStatus getProcessGroupStatus(String groupId); - - /** - * @return a merged representation of the System Diagnostics for all nodes in the cluster - */ - SystemDiagnostics getSystemDiagnostics(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java new file mode 100644 index 0000000..66ad494 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java @@ -0,0 +1,646 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.manager; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.controller.status.RunStatus; +import org.apache.nifi.controller.status.TransmissionStatus; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.web.api.dto.BulletinDTO; +import org.apache.nifi.web.api.dto.CounterDTO; +import org.apache.nifi.web.api.dto.CountersDTO; +import org.apache.nifi.web.api.dto.CountersSnapshotDTO; +import org.apache.nifi.web.api.dto.NodeCountersSnapshotDTO; +import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsSnapshotDTO; +import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO; +import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO; +import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO; +import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO.StorageUsageDTO; +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; +import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; +import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.NodePortStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.PortStatusDTO; +import org.apache.nifi.web.api.dto.status.PortStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; +import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO; + +public class StatusMerger { + public static void merge(final ControllerStatusDTO target, final ControllerStatusDTO toMerge) { + if (target == null || toMerge == null) { + return; + } + + target.setActiveRemotePortCount(target.getActiveRemotePortCount() + toMerge.getActiveRemotePortCount()); + target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); + target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued()); + target.setDisabledCount(target.getDisabledCount() + toMerge.getDisabledCount()); + target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued()); + target.setInactiveRemotePortCount(target.getInactiveRemotePortCount() + toMerge.getInactiveRemotePortCount()); + target.setInvalidCount(target.getInvalidCount() + toMerge.getInvalidCount()); + target.setRunningCount(target.getRunningCount() + toMerge.getRunningCount()); + target.setStoppedCount(target.getStoppedCount() + toMerge.getStoppedCount()); + + target.setBulletins(mergeBulletins(target.getBulletins(), toMerge.getBulletins())); + target.setControllerServiceBulletins(mergeBulletins(target.getControllerServiceBulletins(), toMerge.getControllerServiceBulletins())); + target.setReportingTaskBulletins(mergeBulletins(target.getReportingTaskBulletins(), toMerge.getReportingTaskBulletins())); + + updatePrettyPrintedFields(target); + } + + public static void updatePrettyPrintedFields(final ControllerStatusDTO target) { + target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued())); + target.setConnectedNodes(formatCount(target.getConnectedNodeCount()) + " / " + formatCount(target.getTotalNodeCount())); + } + + public static List<BulletinDTO> mergeBulletins(final List<BulletinDTO> targetBulletins, final List<BulletinDTO> toMerge) { + final List<BulletinDTO> bulletins = new ArrayList<>(); + if (targetBulletins != null) { + bulletins.addAll(targetBulletins); + } + + if (toMerge != null) { + bulletins.addAll(toMerge); + } + + return bulletins; + } + + + public static void merge(final ProcessGroupStatusDTO target, final ProcessGroupStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot()); + + if (target.getNodeSnapshots() != null) { + final NodeProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeProcessGroupStatusSnapshotDTO(); + nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot()); + nodeSnapshot.setAddress(nodeAddress); + nodeSnapshot.setApiPort(nodeApiPort); + nodeSnapshot.setNodeId(nodeId); + + target.getNodeSnapshots().add(nodeSnapshot); + } + } + + public static void merge(final ProcessGroupStatusSnapshotDTO target, final ProcessGroupStatusSnapshotDTO toMerge) { + if (target == null || toMerge == null) { + return; + } + + target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn()); + target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn()); + + target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued()); + target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued()); + + target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead()); + target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten()); + + target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut()); + target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut()); + + target.setBytesTransferred(target.getBytesTransferred() + toMerge.getBytesTransferred()); + target.setFlowFilesTransferred(target.getFlowFilesTransferred() + toMerge.getFlowFilesTransferred()); + + target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived()); + target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived()); + + target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent()); + target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent()); + + target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); + updatePrettyPrintedFields(target); + + // connection status + // sort by id + final Map<String, ConnectionStatusSnapshotDTO> mergedConnectionMap = new HashMap<>(); + for (final ConnectionStatusSnapshotDTO status : replaceNull(target.getConnectionStatusSnapshots())) { + mergedConnectionMap.put(status.getId(), status); + } + + for (final ConnectionStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getConnectionStatusSnapshots())) { + ConnectionStatusSnapshotDTO merged = mergedConnectionMap.get(statusToMerge.getId()); + if (merged == null) { + mergedConnectionMap.put(statusToMerge.getId(), statusToMerge.clone()); + continue; + } + + merge(merged, statusToMerge); + } + target.setConnectionStatusSnapshots(mergedConnectionMap.values()); + + // processor status + final Map<String, ProcessorStatusSnapshotDTO> mergedProcessorMap = new HashMap<>(); + for (final ProcessorStatusSnapshotDTO status : replaceNull(target.getProcessorStatusSnapshots())) { + mergedProcessorMap.put(status.getId(), status); + } + + for (final ProcessorStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getProcessorStatusSnapshots())) { + ProcessorStatusSnapshotDTO merged = mergedProcessorMap.get(statusToMerge.getId()); + if (merged == null) { + mergedProcessorMap.put(statusToMerge.getId(), statusToMerge.clone()); + continue; + } + + merge(merged, statusToMerge); + } + target.setProcessorStatusSnapshots(mergedProcessorMap.values()); + + + // input ports + final Map<String, PortStatusSnapshotDTO> mergedInputPortMap = new HashMap<>(); + for (final PortStatusSnapshotDTO status : replaceNull(target.getInputPortStatusSnapshots())) { + mergedInputPortMap.put(status.getId(), status); + } + + for (final PortStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getInputPortStatusSnapshots())) { + PortStatusSnapshotDTO merged = mergedInputPortMap.get(statusToMerge.getId()); + if (merged == null) { + mergedInputPortMap.put(statusToMerge.getId(), statusToMerge.clone()); + continue; + } + + merge(merged, statusToMerge); + } + target.setInputPortStatusSnapshots(mergedInputPortMap.values()); + + // output ports + final Map<String, PortStatusSnapshotDTO> mergedOutputPortMap = new HashMap<>(); + for (final PortStatusSnapshotDTO status : replaceNull(target.getOutputPortStatusSnapshots())) { + mergedOutputPortMap.put(status.getId(), status); + } + + for (final PortStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getOutputPortStatusSnapshots())) { + PortStatusSnapshotDTO merged = mergedOutputPortMap.get(statusToMerge.getId()); + if (merged == null) { + mergedOutputPortMap.put(statusToMerge.getId(), statusToMerge.clone()); + continue; + } + + merge(merged, statusToMerge); + } + target.setOutputPortStatusSnapshots(mergedOutputPortMap.values()); + + // child groups + final Map<String, ProcessGroupStatusSnapshotDTO> mergedGroupMap = new HashMap<>(); + for (final ProcessGroupStatusSnapshotDTO status : replaceNull(target.getProcessGroupStatusSnapshots())) { + mergedGroupMap.put(status.getId(), status); + } + + for (final ProcessGroupStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getProcessGroupStatusSnapshots())) { + ProcessGroupStatusSnapshotDTO merged = mergedGroupMap.get(statusToMerge.getId()); + if (merged == null) { + mergedGroupMap.put(statusToMerge.getId(), statusToMerge.clone()); + continue; + } + + merge(merged, statusToMerge); + } + target.setOutputPortStatusSnapshots(mergedOutputPortMap.values()); + + // remote groups + final Map<String, RemoteProcessGroupStatusSnapshotDTO> mergedRemoteGroupMap = new HashMap<>(); + for (final RemoteProcessGroupStatusSnapshotDTO status : replaceNull(target.getRemoteProcessGroupStatusSnapshots())) { + mergedRemoteGroupMap.put(status.getId(), status); + } + + for (final RemoteProcessGroupStatusSnapshotDTO statusToMerge : replaceNull(toMerge.getRemoteProcessGroupStatusSnapshots())) { + RemoteProcessGroupStatusSnapshotDTO merged = mergedRemoteGroupMap.get(statusToMerge.getId()); + if (merged == null) { + mergedRemoteGroupMap.put(statusToMerge.getId(), statusToMerge.clone()); + continue; + } + + merge(merged, statusToMerge); + } + target.setRemoteProcessGroupStatusSnapshots(mergedRemoteGroupMap.values()); + } + + private static <T> Collection<T> replaceNull(final Collection<T> collection) { + return (collection == null) ? Collections.<T> emptyList() : collection; + } + + + /** + * Updates the fields that are "pretty printed" based on the raw values currently set. For example, + * {@link ProcessGroupStatusSnapshotDTO#setInput(String)} will be called with the pretty-printed form of the + * FlowFile counts and sizes retrieved via {@link ProcessGroupStatusSnapshotDTO#getFlowFilesIn()} and + * {@link ProcessGroupStatusSnapshotDTO#getBytesIn()}. + * + * This logic is performed here, rather than in the DTO itself because the DTO needs to be kept purely + * getters & setters - otherwise the automatic marshalling and unmarshalling to/from JSON becomes very + * complicated. + * + * @param target the DTO to update + */ + public static void updatePrettyPrintedFields(final ProcessGroupStatusSnapshotDTO target) { + target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued())); + target.setQueuedCount(formatCount(target.getFlowFilesQueued())); + target.setQueuedSize(formatDataSize(target.getBytesQueued())); + target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn())); + target.setRead(formatDataSize(target.getBytesRead())); + target.setWritten(formatDataSize(target.getBytesWritten())); + target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut())); + target.setTransferred(prettyPrint(target.getFlowFilesTransferred(), target.getBytesTransferred())); + target.setReceived(prettyPrint(target.getFlowFilesReceived(), target.getBytesReceived())); + target.setSent(prettyPrint(target.getFlowFilesSent(), target.getBytesSent())); + } + + public static void merge(final RemoteProcessGroupStatusDTO target, final RemoteProcessGroupStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot()); + + if (target.getNodeSnapshots() != null) { + final NodeRemoteProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeRemoteProcessGroupStatusSnapshotDTO(); + nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot()); + nodeSnapshot.setAddress(nodeAddress); + nodeSnapshot.setApiPort(nodeApiPort); + nodeSnapshot.setNodeId(nodeId); + + target.getNodeSnapshots().add(nodeSnapshot); + } + } + + public static void merge(final PortStatusDTO target, final PortStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot()); + + if (target.getNodeSnapshots() != null) { + final NodePortStatusSnapshotDTO nodeSnapshot = new NodePortStatusSnapshotDTO(); + nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot()); + nodeSnapshot.setAddress(nodeAddress); + nodeSnapshot.setApiPort(nodeApiPort); + nodeSnapshot.setNodeId(nodeId); + + target.getNodeSnapshots().add(nodeSnapshot); + } + } + + public static void merge(final ConnectionStatusDTO target, final ConnectionStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot()); + + if (target.getNodeSnapshots() != null) { + final NodeConnectionStatusSnapshotDTO nodeSnapshot = new NodeConnectionStatusSnapshotDTO(); + nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot()); + nodeSnapshot.setAddress(nodeAddress); + nodeSnapshot.setApiPort(nodeApiPort); + nodeSnapshot.setNodeId(nodeId); + + target.getNodeSnapshots().add(nodeSnapshot); + } + } + + public static void merge(final ProcessorStatusDTO target, final ProcessorStatusDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot()); + + if (target.getNodeSnapshots() != null) { + final NodeProcessorStatusSnapshotDTO nodeSnapshot = new NodeProcessorStatusSnapshotDTO(); + nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot()); + nodeSnapshot.setAddress(nodeAddress); + nodeSnapshot.setApiPort(nodeApiPort); + nodeSnapshot.setNodeId(nodeId); + + target.getNodeSnapshots().add(nodeSnapshot); + } + } + + public static void merge(final ProcessorStatusSnapshotDTO target, final ProcessorStatusSnapshotDTO toMerge) { + if (target == null || toMerge == null) { + return; + } + + // if the status to merge is invalid allow it to take precedence. whether the + // processor run status is disabled/stopped/running is part of the flow configuration + // and should not differ amongst nodes. however, whether a processor is invalid + // can be driven by environmental conditions. this check allows any of those to + // take precedence over the configured run status. + if (RunStatus.Invalid.name().equals(toMerge.getRunStatus())) { + target.setRunStatus(RunStatus.Invalid.name()); + } + + target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead()); + target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten()); + target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn()); + target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn()); + target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut()); + target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut()); + target.setTaskCount(target.getTaskCount() + toMerge.getTaskCount()); + target.setTasksDurationNanos(target.getTasksDurationNanos() + toMerge.getTasksDurationNanos()); + target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); + updatePrettyPrintedFields(target); + } + + public static void updatePrettyPrintedFields(final ProcessorStatusSnapshotDTO target) { + target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn())); + target.setRead(formatDataSize(target.getBytesRead())); + target.setWritten(formatDataSize(target.getBytesWritten())); + target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut())); + + final Integer taskCount = target.getTaskCount(); + final String tasks = (taskCount == null) ? "-" : formatCount(taskCount); + target.setTasks(tasks); + + target.setTasksDuration(FormatUtils.formatHoursMinutesSeconds(target.getTasksDurationNanos(), TimeUnit.NANOSECONDS)); + } + + + public static void merge(final ConnectionStatusSnapshotDTO target, final ConnectionStatusSnapshotDTO toMerge) { + if (target == null || toMerge == null) { + return; + } + + target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn()); + target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn()); + target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut()); + target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut()); + target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued()); + target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued()); + updatePrettyPrintedFields(target); + } + + public static void updatePrettyPrintedFields(final ConnectionStatusSnapshotDTO target) { + target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued())); + target.setQueuedCount(formatCount(target.getFlowFilesQueued())); + target.setQueuedSize(formatDataSize(target.getBytesQueued())); + target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn())); + target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut())); + } + + + + public static void merge(final RemoteProcessGroupStatusSnapshotDTO target, final RemoteProcessGroupStatusSnapshotDTO toMerge) { + final String transmittingValue = TransmissionStatus.Transmitting.name(); + if (transmittingValue.equals(target.getTransmissionStatus()) || transmittingValue.equals(toMerge.getTransmissionStatus())) { + target.setTransmissionStatus(transmittingValue); + } + + target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); + + final List<String> authIssues = new ArrayList<>(); + if (target.getAuthorizationIssues() != null) { + authIssues.addAll(target.getAuthorizationIssues()); + } + if (toMerge.getAuthorizationIssues() != null) { + authIssues.addAll(toMerge.getAuthorizationIssues()); + } + target.setAuthorizationIssues(authIssues); + + target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent()); + target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent()); + target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived()); + target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived()); + updatePrettyPrintedFields(target); + } + + public static void updatePrettyPrintedFields(final RemoteProcessGroupStatusSnapshotDTO target) { + target.setReceived(prettyPrint(target.getFlowFilesReceived(), target.getBytesReceived())); + target.setSent(prettyPrint(target.getFlowFilesSent(), target.getBytesSent())); + } + + + + public static void merge(final PortStatusSnapshotDTO target, final PortStatusSnapshotDTO toMerge) { + if (target == null || toMerge == null) { + return; + } + + target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); + target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn()); + target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn()); + target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut()); + target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut()); + target.setTransmitting(Boolean.TRUE.equals(target.isTransmitting()) || Boolean.TRUE.equals(toMerge.isTransmitting())); + + // should be unnecessary here since ports run status not should be affected by + // environmental conditions but doing so in case that changes + if (RunStatus.Invalid.name().equals(toMerge.getRunStatus())) { + target.setRunStatus(RunStatus.Invalid.name()); + } + + updatePrettyPrintedFields(target); + } + + public static void updatePrettyPrintedFields(final PortStatusSnapshotDTO target) { + target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn())); + target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut())); + } + + + public static void merge(final SystemDiagnosticsDTO target, final SystemDiagnosticsDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot()); + + List<NodeSystemDiagnosticsSnapshotDTO> nodeSnapshots = target.getNodeSnapshots(); + if (nodeSnapshots == null) { + nodeSnapshots = new ArrayList<>(); + } + + final NodeSystemDiagnosticsSnapshotDTO nodeSnapshot = new NodeSystemDiagnosticsSnapshotDTO(); + nodeSnapshot.setAddress(nodeAddress); + nodeSnapshot.setApiPort(nodeApiPort); + nodeSnapshot.setNodeId(nodeId); + nodeSnapshot.setSnapshot(toMerge.getAggregateSnapshot()); + + nodeSnapshots.add(nodeSnapshot); + target.setNodeSnapshots(nodeSnapshots); + } + + public static void merge(final SystemDiagnosticsSnapshotDTO target, final SystemDiagnosticsSnapshotDTO toMerge) { + if (target == null || toMerge == null) { + return; + } + + target.setAvailableProcessors(target.getAvailableProcessors() + toMerge.getAvailableProcessors()); + target.setDaemonThreads(target.getDaemonThreads() + toMerge.getDaemonThreads()); + target.setFreeHeapBytes(target.getFreeHeapBytes() + toMerge.getFreeHeapBytes()); + target.setFreeNonHeapBytes(target.getFreeNonHeapBytes() + toMerge.getFreeNonHeapBytes()); + target.setMaxHeapBytes(target.getMaxHeapBytes() + toMerge.getMaxHeapBytes()); + target.setMaxNonHeapBytes(target.getMaxNonHeapBytes() + toMerge.getMaxNonHeapBytes()); + target.setProcessorLoadAverage(target.getProcessorLoadAverage() + toMerge.getProcessorLoadAverage()); + target.setTotalHeapBytes(target.getTotalHeapBytes() + toMerge.getTotalHeapBytes()); + target.setTotalNonHeapBytes(target.getTotalNonHeapBytes() + toMerge.getTotalNonHeapBytes()); + target.setTotalThreads(target.getTotalThreads() + toMerge.getTotalThreads()); + target.setUsedHeapBytes(target.getUsedHeapBytes() + toMerge.getUsedHeapBytes()); + target.setUsedNonHeapBytes(target.getUsedNonHeapBytes() + toMerge.getUsedNonHeapBytes()); + + merge(target.getContentRepositoryStorageUsage(), toMerge.getContentRepositoryStorageUsage()); + merge(target.getFlowFileRepositoryStorageUsage(), toMerge.getFlowFileRepositoryStorageUsage()); + mergeGarbageCollection(target.getGarbageCollection(), toMerge.getGarbageCollection()); + + updatePrettyPrintedFields(target); + } + + public static void updatePrettyPrintedFields(final SystemDiagnosticsSnapshotDTO target) { + // heap + target.setMaxHeap(FormatUtils.formatDataSize(target.getMaxHeapBytes())); + target.setTotalHeap(FormatUtils.formatDataSize(target.getTotalHeapBytes())); + target.setUsedHeap(FormatUtils.formatDataSize(target.getUsedHeapBytes())); + target.setFreeHeap(FormatUtils.formatDataSize(target.getFreeHeapBytes())); + if (target.getMaxHeapBytes() != -1) { + target.setHeapUtilization(FormatUtils.formatUtilization(getUtilization(target.getUsedHeapBytes(), target.getMaxHeapBytes()))); + } + + // non heap + target.setMaxNonHeap(FormatUtils.formatDataSize(target.getMaxNonHeapBytes())); + target.setTotalNonHeap(FormatUtils.formatDataSize(target.getTotalNonHeapBytes())); + target.setUsedNonHeap(FormatUtils.formatDataSize(target.getUsedNonHeapBytes())); + target.setFreeNonHeap(FormatUtils.formatDataSize(target.getFreeNonHeapBytes())); + if (target.getMaxNonHeapBytes() != -1) { + target.setNonHeapUtilization(FormatUtils.formatUtilization(getUtilization(target.getUsedNonHeapBytes(), target.getMaxNonHeapBytes()))); + } + } + + public static void merge(final Set<StorageUsageDTO> targetSet, final Set<StorageUsageDTO> toMerge) { + final Map<String, StorageUsageDTO> storageById = new HashMap<>(); + for (final StorageUsageDTO targetUsage : targetSet) { + storageById.put(targetUsage.getIdentifier(), targetUsage); + } + + for (final StorageUsageDTO usageToMerge : toMerge) { + final StorageUsageDTO targetUsage = storageById.get(usageToMerge.getIdentifier()); + if (targetUsage == null) { + storageById.put(usageToMerge.getIdentifier(), usageToMerge); + } else { + merge(targetUsage, usageToMerge); + } + } + + targetSet.clear(); + targetSet.addAll(storageById.values()); + } + + public static void merge(final StorageUsageDTO target, final StorageUsageDTO toMerge) { + target.setFreeSpaceBytes(target.getFreeSpaceBytes() + toMerge.getFreeSpaceBytes()); + target.setTotalSpaceBytes(target.getTotalSpaceBytes() + toMerge.getTotalSpaceBytes()); + target.setUsedSpaceBytes(target.getUsedSpaceBytes() + toMerge.getUsedSpaceBytes()); + updatePrettyPrintedFields(target); + } + + public static void updatePrettyPrintedFields(final StorageUsageDTO target) { + target.setFreeSpace(FormatUtils.formatDataSize(target.getFreeSpaceBytes())); + target.setTotalSpace(FormatUtils.formatDataSize(target.getTotalSpaceBytes())); + target.setUsedSpace(FormatUtils.formatDataSize(target.getUsedSpaceBytes())); + + if (target.getTotalSpaceBytes() != -1) { + target.setUtilization(FormatUtils.formatUtilization(getUtilization(target.getUsedSpaceBytes(), target.getTotalSpaceBytes()))); + } + } + + + public static void mergeGarbageCollection(final Set<GarbageCollectionDTO> targetSet, final Set<GarbageCollectionDTO> toMerge) { + final Map<String, GarbageCollectionDTO> storageById = new HashMap<>(); + for (final GarbageCollectionDTO targetUsage : targetSet) { + storageById.put(targetUsage.getName(), targetUsage); + } + + for (final GarbageCollectionDTO usageToMerge : toMerge) { + final GarbageCollectionDTO targetUsage = storageById.get(usageToMerge.getName()); + if (targetUsage == null) { + storageById.put(usageToMerge.getName(), usageToMerge); + } else { + merge(targetUsage, usageToMerge); + } + } + + targetSet.clear(); + targetSet.addAll(storageById.values()); + } + + public static void merge(final GarbageCollectionDTO target, final GarbageCollectionDTO toMerge) { + target.setCollectionCount(target.getCollectionCount() + toMerge.getCollectionCount()); + target.setCollectionMillis(target.getCollectionMillis() + toMerge.getCollectionMillis()); + updatePrettyPrintedFields(target); + } + + public static void updatePrettyPrintedFields(final GarbageCollectionDTO target) { + target.setCollectionTime(FormatUtils.formatHoursMinutesSeconds(target.getCollectionMillis(), TimeUnit.MILLISECONDS)); + } + + public static void merge(final CountersDTO target, final CountersDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { + merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot()); + + List<NodeCountersSnapshotDTO> nodeSnapshots = target.getNodeSnapshots(); + if (nodeSnapshots == null) { + nodeSnapshots = new ArrayList<>(); + } + + final NodeCountersSnapshotDTO nodeCountersSnapshot = new NodeCountersSnapshotDTO(); + nodeCountersSnapshot.setNodeId(nodeId); + nodeCountersSnapshot.setAddress(nodeAddress); + nodeCountersSnapshot.setApiPort(nodeApiPort); + nodeCountersSnapshot.setSnapshot(toMerge.getAggregateSnapshot()); + + nodeSnapshots.add(nodeCountersSnapshot); + + target.setNodeSnapshots(nodeSnapshots); + } + + public static void merge(final CountersSnapshotDTO target, final CountersSnapshotDTO toMerge) { + final Map<String, CounterDTO> counters = new HashMap<>(); + + for (final CounterDTO counter : target.getCounters()) { + counters.put(counter.getId(), counter); + } + + for (final CounterDTO counter : toMerge.getCounters()) { + final CounterDTO existing = counters.get(counter.getId()); + if (existing == null) { + counters.put(counter.getId(), counter); + } else { + merge(existing, counter); + } + } + + target.setCounters(counters.values()); + } + + public static void merge(final CounterDTO target, final CounterDTO toMerge) { + target.setValueCount(target.getValueCount() + toMerge.getValueCount()); + target.setValue(FormatUtils.formatCount(target.getValueCount())); + } + + + public static int getUtilization(final double used, final double total) { + return (int) Math.round((used / total) * 100); + } + + public static String formatCount(final Integer intStatus) { + return intStatus == null ? "-" : FormatUtils.formatCount(intStatus); + } + + public static String formatDataSize(final Long longStatus) { + return longStatus == null ? "-" : FormatUtils.formatDataSize(longStatus); + } + + public static String prettyPrint(final Integer count, final Long bytes) { + return formatCount(count) + " / " + formatDataSize(bytes); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java index d3d5559..3966a31 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java @@ -46,7 +46,7 @@ public class ClusteredEventAccess implements EventAccess { @Override public ProcessGroupStatus getControllerStatus() { - return clusterManager.getProcessGroupStatus(WebClusterManager.ROOT_GROUP_ID_ALIAS); + return new ProcessGroupStatus(); } @Override
