Repository: nifi Updated Branches: refs/heads/master b3f36489a -> e42ea9ad4
http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java deleted file mode 100644 index 1146a39..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.InputStream; -import java.io.OutputStream; - -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import javax.xml.bind.Unmarshaller; -import javax.xml.bind.annotation.XmlRootElement; - -import org.apache.nifi.cluster.protocol.ProtocolException; - -/** - * The payload of the heartbeat. The payload contains status to inform the cluster manager the current workload of this node. - * - */ -@XmlRootElement -public class HeartbeatPayload { - - private static final JAXBContext JAXB_CONTEXT; - - static { - try { - JAXB_CONTEXT = JAXBContext.newInstance(HeartbeatPayload.class); - } catch (JAXBException e) { - throw new RuntimeException("Unable to create JAXBContext."); - } - } - - private int activeThreadCount; - private long totalFlowFileCount; - private long totalFlowFileBytes; - private long systemStartTime; - - public int getActiveThreadCount() { - return activeThreadCount; - } - - public void setActiveThreadCount(final int activeThreadCount) { - this.activeThreadCount = activeThreadCount; - } - - public long getTotalFlowFileCount() { - return totalFlowFileCount; - } - - public void setTotalFlowFileCount(final long totalFlowFileCount) { - this.totalFlowFileCount = totalFlowFileCount; - } - - public long getTotalFlowFileBytes() { - return totalFlowFileBytes; - } - - public void setTotalFlowFileBytes(final long totalFlowFileBytes) { - this.totalFlowFileBytes = totalFlowFileBytes; - } - - public long getSystemStartTime() { - return systemStartTime; - } - - public void setSystemStartTime(final long systemStartTime) { - this.systemStartTime = systemStartTime; - } - - public byte[] marshal() throws ProtocolException { - final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream(); - marshal(this, payloadBytes); - return payloadBytes.toByteArray(); - } - - public static void marshal(final HeartbeatPayload payload, final OutputStream os) throws ProtocolException { - try { - final Marshaller marshaller = JAXB_CONTEXT.createMarshaller(); - marshaller.marshal(payload, os); - } catch (final JAXBException je) { - throw new ProtocolException(je); - } - } - - public static HeartbeatPayload unmarshal(final InputStream is) throws ProtocolException { - try { - final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); - return (HeartbeatPayload) unmarshaller.unmarshal(is); - } catch (final JAXBException je) { - throw new ProtocolException(je); - } - } - - public static HeartbeatPayload unmarshal(final byte[] bytes) throws ProtocolException { - try { - final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); - return (HeartbeatPayload) unmarshaller.unmarshal(new ByteArrayInputStream(bytes)); - } catch (final JAXBException je) { - throw new ProtocolException(je); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 77c3dd7..efdf152 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -65,7 +65,6 @@ import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.DataAuthorizable; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.user.NiFiUser; -import org.apache.nifi.cluster.HeartbeatPayload; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor; import org.apache.nifi.cluster.coordination.node.ClusterRoles; @@ -74,6 +73,7 @@ import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.cluster.protocol.Heartbeat; +import org.apache.nifi.cluster.protocol.HeartbeatPayload; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeProtocolSender; import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; @@ -96,7 +96,6 @@ import org.apache.nifi.controller.exception.ComponentLifeCycleException; import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.label.StandardLabel; -import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager; import org.apache.nifi.controller.leader.election.LeaderElectionManager; import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener; import org.apache.nifi.controller.queue.FlowFileQueue; @@ -388,6 +387,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R bulletinRepo, /* cluster coordinator */ null, /* heartbeat monitor */ null, + /* leader election manager */ null, /* variable registry */ variableRegistry); } @@ -401,7 +401,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final BulletinRepository bulletinRepo, final ClusterCoordinator clusterCoordinator, final HeartbeatMonitor heartbeatMonitor, - VariableRegistry variableRegistry) { + final LeaderElectionManager leaderElectionManager, + final VariableRegistry variableRegistry) { final FlowController flowController = new FlowController( flowFileEventRepo, @@ -413,7 +414,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R protocolSender, bulletinRepo, clusterCoordinator, - heartbeatMonitor, variableRegistry); + heartbeatMonitor, + leaderElectionManager, + variableRegistry); return flowController; } @@ -429,6 +432,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final BulletinRepository bulletinRepo, final ClusterCoordinator clusterCoordinator, final HeartbeatMonitor heartbeatMonitor, + final LeaderElectionManager leaderElectionManager, final VariableRegistry variableRegistry) { maxTimerDrivenThreads = new AtomicInteger(10); @@ -578,10 +582,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED); heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false)); + this.leaderElectionManager = leaderElectionManager; if (configuredForClustering) { - leaderElectionManager = new CuratorLeaderElectionManager(4, properties); - heartbeater = new ClusterProtocolHeartbeater(protocolSender, properties); + heartbeater = new ClusterProtocolHeartbeater(protocolSender, clusterCoordinator, leaderElectionManager); // Check if there is already a cluster coordinator elected. If not, go ahead // and register for coordinator role. If there is already one elected, do not register until @@ -601,7 +605,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R leaderElectionManager.start(); } else { - leaderElectionManager = null; heartbeater = null; } } @@ -3307,6 +3310,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private void registerForClusterCoordinator() { + final String participantId = heartbeatMonitor.getHeartbeatAddress(); + leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() { @Override public synchronized void onLeaderRelinquish() { @@ -3320,25 +3325,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // call start() when we become the leader, and this will ensure that initialization is handled. The heartbeat monitor // then will check the zookeeper znode to check if it is the cluster coordinator before kicking any nodes out of the // cluster. - - if (clusterCoordinator != null) { - clusterCoordinator.removeRole(ClusterRoles.CLUSTER_COORDINATOR); - } } @Override public synchronized void onLeaderElection() { LOG.info("This node elected Active Cluster Coordinator"); heartbeatMonitor.start(); // ensure heartbeat monitor is started - - if (clusterCoordinator != null) { - clusterCoordinator.addRole(ClusterRoles.CLUSTER_COORDINATOR); - } } - }); + }, participantId); } private void registerForPrimaryNode() { + final String participantId = heartbeatMonitor.getHeartbeatAddress(); + leaderElectionManager.register(ClusterRoles.PRIMARY_NODE, new LeaderElectionStateChangeListener() { @Override public void onLeaderElection() { @@ -3349,7 +3348,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public void onLeaderRelinquish() { setPrimary(false); } - }); + }, participantId); } /** @@ -3854,7 +3853,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public void run() { - try { + try (final NarCloseable narCloseable = NarCloseable.withFrameworkNar()) { if (heartbeatsSuspended.get()) { return; } @@ -3916,6 +3915,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup()); hbPayload.setTotalFlowFileCount(queueSize.getObjectCount()); hbPayload.setTotalFlowFileBytes(queueSize.getByteCount()); + hbPayload.setClusterStatus(clusterCoordinator.getConnectionStatuses()); // create heartbeat message final NodeIdentifier nodeId = getNodeId(); @@ -3924,15 +3924,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return null; } - final Set<String> roles = new HashSet<>(); - if (bean.isPrimary()) { - roles.add(ClusterRoles.PRIMARY_NODE); - } - if (clusterCoordinator.isActiveClusterCoordinator()) { - roles.add(ClusterRoles.CLUSTER_COORDINATOR); - } - - final Heartbeat heartbeat = new Heartbeat(nodeId, roles, connectionStatus, hbPayload.marshal()); + final Heartbeat heartbeat = new Heartbeat(nodeId, connectionStatus, hbPayload.marshal()); final HeartbeatMessage message = new HeartbeatMessage(); message.setHeartbeat(heartbeat); http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 091e59c..42f239c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -35,7 +35,6 @@ import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -853,9 +852,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // mark the node as clustered controller.setClustered(true, response.getInstanceId()); - final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId); - final Set<String> roles = status == null ? Collections.emptySet() : status.getRoles(); - controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, roles)); + controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED)); // start the processors as indicated by the dataflow controller.onFlowInitialized(autoResumeState); http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index d1822ef..cb663e1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -16,6 +16,34 @@ */ package org.apache.nifi.controller; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; + +import javax.xml.XMLConstants; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; + import org.apache.commons.collections4.CollectionUtils; import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer; import org.apache.nifi.authorization.Authorizer; @@ -41,7 +69,6 @@ import org.apache.nifi.controller.serialization.FlowSynchronizer; import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.controller.service.ControllerServiceLoader; import org.apache.nifi.controller.service.ControllerServiceNode; -import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.fingerprint.FingerprintException; @@ -66,7 +93,6 @@ import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; -import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.FunnelDTO; import org.apache.nifi.web.api.dto.LabelDTO; @@ -86,33 +112,6 @@ import org.w3c.dom.Node; import org.w3c.dom.NodeList; import org.xml.sax.SAXException; -import javax.xml.XMLConstants; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; -import javax.xml.validation.Schema; -import javax.xml.validation.SchemaFactory; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.zip.GZIPInputStream; - /** */ public class StandardFlowSynchronizer implements FlowSynchronizer { @@ -360,10 +359,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // enable all the original controller services ControllerServiceLoader.enableControllerServices(controllerServices, controller, encryptor, autoResumeState); - } else { - for (final Element serviceElement : serviceElements) { - updateControllerService(controller, serviceElement, encryptor); - } } } @@ -505,22 +500,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { return baos.toByteArray(); } - private void updateControllerService(final FlowController controller, final Element controllerServiceElement, final StringEncryptor encryptor) { - final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); - - final ControllerServiceState dtoState = ControllerServiceState.valueOf(dto.getState()); - final boolean dtoEnabled = (dtoState == ControllerServiceState.ENABLED || dtoState == ControllerServiceState.ENABLING); - - final ControllerServiceNode serviceNode = controller.getControllerServiceNode(dto.getId()); - final ControllerServiceState serviceState = serviceNode.getState(); - final boolean serviceEnabled = (serviceState == ControllerServiceState.ENABLED || serviceState == ControllerServiceState.ENABLING); - - if (dtoEnabled && !serviceEnabled) { - controller.enableControllerService(controller.getControllerServiceNode(dto.getId())); - } else if (!dtoEnabled && serviceEnabled) { - controller.disableControllerService(controller.getControllerServiceNode(dto.getId())); - } - } private ReportingTaskNode getOrCreateReportingTask(final FlowController controller, final ReportingTaskDTO dto, final boolean controllerInitialized, final boolean existingFlowEmpty) throws ReportingTaskInstantiationException { @@ -665,12 +644,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // get the real process group and ID final ProcessGroup processGroup = controller.getGroup(processGroupDto.getId()); - // Update Controller Services - final List<Element> serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService"); - for (final Element serviceNodeElement : serviceNodeList) { - updateControllerService(controller, serviceNodeElement, encryptor); - } - // processors & ports cannot be updated - they must be the same. Except for the scheduled state. final List<Element> processorNodeList = getChildrenByTagName(processGroupElement, "processor"); for (final Element processorElement : processorNodeList) { http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java index 0240318..d675d0c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java @@ -18,102 +18,79 @@ package org.apache.nifi.controller.cluster; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Properties; - -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.RetryNTimes; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.ClusterRoles; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.protocol.HeartbeatPayload; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeProtocolSender; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; +import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage; +import org.apache.nifi.controller.leader.election.LeaderElectionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Uses ZooKeeper in order to determine which node is the elected Cluster Coordinator and to indicate - * that this node is part of the cluster. However, once the Cluster Coordinator is known, heartbeats are + * Uses Leader Election Manager in order to determine which node is the elected Cluster Coordinator and to indicate + * that this node is part of the cluster. Once the Cluster Coordinator is known, heartbeats are * sent directly to the Cluster Coordinator. */ public class ClusterProtocolHeartbeater implements Heartbeater { private static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeater.class); private final NodeProtocolSender protocolSender; - private final CuratorFramework curatorClient; - private final String nodesPathPrefix; - - private final String coordinatorPath; - private volatile String coordinatorAddress; + private final LeaderElectionManager electionManager; + private final ClusterCoordinator clusterCoordinator; - - public ClusterProtocolHeartbeater(final NodeProtocolSender protocolSender, final Properties properties) { + public ClusterProtocolHeartbeater(final NodeProtocolSender protocolSender, final ClusterCoordinator clusterCoordinator, final LeaderElectionManager electionManager) { this.protocolSender = protocolSender; - - final RetryPolicy retryPolicy = new RetryNTimes(10, 500); - final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(properties); - - curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), - zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy); - - curatorClient.start(); - nodesPathPrefix = zkConfig.resolvePath("cluster/nodes"); - coordinatorPath = nodesPathPrefix + "/coordinator"; + this.clusterCoordinator = clusterCoordinator; + this.electionManager = electionManager; } @Override public String getHeartbeatAddress() throws IOException { - final String curAddress = coordinatorAddress; - if (curAddress != null) { - return curAddress; + final String heartbeatAddress = electionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR); + if (heartbeatAddress == null) { + throw new ProtocolException("Cannot send heartbeat because there is no Cluster Coordinator currently elected"); } - try { - // Get coordinator address and add watcher to change who we are heartbeating to if the value changes. - final byte[] coordinatorAddressBytes = curatorClient.getData().usingWatcher(new Watcher() { - @Override - public void process(final WatchedEvent event) { - coordinatorAddress = null; - } - }).forPath(coordinatorPath); - final String address = coordinatorAddress = new String(coordinatorAddressBytes, StandardCharsets.UTF_8); - - logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address); - return address; - } catch (Exception e) { - throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e); - } + return heartbeatAddress; } - @Override public synchronized void send(final HeartbeatMessage heartbeatMessage) throws IOException { final String heartbeatAddress = getHeartbeatAddress(); - - try { - protocolSender.heartbeat(heartbeatMessage, heartbeatAddress); - } catch (final ProtocolException pe) { - // a ProtocolException is likely the result of not being able to communicate - // with the coordinator. If we do get an IOException communicating with the coordinator, - // it will be the cause of the Protocol Exception. In this case, set coordinatorAddress - // to null so that we double-check next time that the coordinator has not changed. - if (pe.getCause() instanceof IOException) { - coordinatorAddress = null; + final HeartbeatResponseMessage responseMessage = protocolSender.heartbeat(heartbeatMessage, heartbeatAddress); + + final byte[] payloadBytes = heartbeatMessage.getHeartbeat().getPayload(); + final HeartbeatPayload payload = HeartbeatPayload.unmarshal(payloadBytes); + final List<NodeConnectionStatus> nodeStatusList = payload.getClusterStatus(); + final Map<NodeIdentifier, Long> updateIdMap = nodeStatusList.stream().collect( + Collectors.toMap(status -> status.getNodeIdentifier(), status -> status.getUpdateIdentifier())); + + final List<NodeConnectionStatus> updatedStatuses = responseMessage.getUpdatedNodeStatuses(); + if (updatedStatuses != null) { + for (final NodeConnectionStatus updatedStatus : updatedStatuses) { + final NodeIdentifier nodeId = updatedStatus.getNodeIdentifier(); + final Long updateId = updateIdMap.get(nodeId); + + final boolean updated = clusterCoordinator.resetNodeStatus(updatedStatus, updateId == null ? -1L : updateId); + if (updated) { + logger.info("After receiving heartbeat response, updated status of {} to {}", updatedStatus.getNodeIdentifier(), updatedStatus); + } else { + logger.debug("After receiving heartbeat response, did not update status of {} to {} because the update is out-of-date", updatedStatus.getNodeIdentifier(), updatedStatus); + } } - - throw pe; } } - @Override public void close() throws IOException { - if (curatorClient != null) { - curatorClient.close(); - } - - logger.info("ZooKeeper heartbeater closed. Will no longer send Heartbeat messages to ZooKeeper"); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java index 7bf7494..1435182 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java @@ -21,12 +21,14 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import org.apache.commons.lang3.StringUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; +import org.apache.curator.framework.recipes.leader.Participant; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryForever; import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; @@ -47,7 +49,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { private volatile boolean stopped = true; private final Map<String, LeaderRole> leaderRoles = new HashMap<>(); - private final Map<String, LeaderElectionStateChangeListener> registeredRoles = new HashMap<>(); + private final Map<String, RegisteredRole> registeredRoles = new HashMap<>(); public CuratorLeaderElectionManager(final int threadPoolSize) { @@ -82,8 +84,9 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { // Call #register for each already-registered role. This will // cause us to start listening for leader elections for that // role again - for (final Map.Entry<String, LeaderElectionStateChangeListener> entry : registeredRoles.entrySet()) { - register(entry.getKey(), entry.getValue()); + for (final Map.Entry<String, RegisteredRole> entry : registeredRoles.entrySet()) { + final RegisteredRole role = entry.getValue(); + register(entry.getKey(), role.getListener(), role.getParticipantId()); } logger.info("{} started", this); @@ -97,7 +100,12 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { @Override - public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener) { + public void register(String roleName, LeaderElectionStateChangeListener listener) { + register(roleName, listener, null); + } + + @Override + public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) { logger.debug("{} Registering new Leader Selector for role {}", this, roleName); if (leaderRoles.containsKey(roleName)) { @@ -114,18 +122,23 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { throw new IllegalStateException("Cannot register leader election for role '" + roleName + "' because this is not a valid role name"); } - registeredRoles.put(roleName, listener); + registeredRoles.put(roleName, new RegisteredRole(participantId, listener)); if (!isStopped()) { final ElectionListener electionListener = new ElectionListener(roleName, listener); final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener); leaderSelector.autoRequeue(); + if (participantId != null) { + leaderSelector.setId(participantId); + } + leaderSelector.start(); final LeaderRole leaderRole = new LeaderRole(leaderSelector, electionListener); leaderRoles.put(roleName, leaderRole); } + logger.info("{} Registered new Leader Selector for role {}", this, roleName); } @@ -185,6 +198,32 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { return role.isLeader(); } + @Override + public synchronized String getLeader(final String roleName) { + final LeaderRole role = leaderRoles.get(roleName); + if (role == null) { + return null; + } + + Participant participant; + try { + participant = role.getLeaderSelector().getLeader(); + } catch (Exception e) { + logger.debug("Unable to determine leader for role '{}'; returning null", roleName); + return null; + } + + if (participant == null) { + return null; + } + + final String participantId = participant.getId(); + if (StringUtils.isEmpty(participantId)) { + return null; + } + + return participantId; + } private static class LeaderRole { private final LeaderSelector leaderSelector; @@ -204,6 +243,23 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { } } + private static class RegisteredRole { + private final LeaderElectionStateChangeListener listener; + private final String participantId; + + public RegisteredRole(final String participantId, final LeaderElectionStateChangeListener listener) { + this.participantId = participantId; + this.listener = listener; + } + + public LeaderElectionStateChangeListener getListener() { + return listener; + } + + public String getParticipantId() { + return participantId; + } + } private class ElectionListener extends LeaderSelectorListenerAdapter implements LeaderSelectorListener { private final String roleName; http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java index d16dbdb..ef36528 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java @@ -31,7 +31,7 @@ public interface LeaderElectionManager { void register(String roleName); /** - * Adds a new role for which a leader is required + * Adds a new role for which a leader is required, without providing a Participant ID * * @param roleName the name of the role * @param listener a listener that will be called when the node gains or relinquishes @@ -40,6 +40,28 @@ public interface LeaderElectionManager { void register(String roleName, LeaderElectionStateChangeListener listener); /** + * Adds a new role for which a leader is required, providing the given value for this node as the Participant ID + * + * @param roleName the name of the role + * @param listener a listener that will be called when the node gains or relinquishes + * the role of leader + * @param participantId the ID to register as this node's Participant ID. All nodes will see this as the identifier when + * asking to see who the leader is via the {@link #getLeader(String)} method + */ + void register(String roleName, LeaderElectionStateChangeListener listener, String participantId); + + /** + * Returns the Participant ID of the node that is elected the leader, if one was provided when the node registered + * for the role via {@link #register(String, LeaderElectionStateChangeListener, String)}. If there is currently no leader + * known or if the role was registered without providing a Participant ID, this will return <code>null</code>. + * + * @param roleName the name of the role + * @return the Participant ID of the node that is elected leader, or <code>null</code> if either no leader is known or the leader + * did not register with a Participant ID. + */ + String getLeader(String roleName); + + /** * Removes the role with the given name from this manager. If this * node is the elected leader for the given role, this node will relinquish * the leadership role http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java new file mode 100644 index 0000000..a2ed86e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.leader.election; + +/** + * <p> + * A LeaderElectionManager to use when running a standalone (un-clustered) NiFi instance + * </p> + */ +public class StandaloneLeaderElectionManager implements LeaderElectionManager { + + @Override + public void start() { + } + + @Override + public void register(final String roleName) { + } + + @Override + public void register(final String roleName, final LeaderElectionStateChangeListener listener) { + } + + @Override + public void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) { + } + + @Override + public String getLeader(final String roleName) { + return null; + } + + @Override + public void unregister(final String roleName) { + } + + @Override + public boolean isLeader(final String roleName) { + return false; + } + + @Override + public boolean isStopped() { + return false; + } + + @Override + public void stop() { + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 88fbcdd..d50d31b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -908,6 +908,7 @@ public final class FingerprintFactory { builder.append(dto.getName()); builder.append(dto.getComments()); builder.append(dto.getAnnotationData()); + builder.append(dto.getState()); final Map<String, String> properties = dto.getProperties(); if (properties == null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java index 2760ca9..7ed9187 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java @@ -22,13 +22,12 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor; import org.apache.nifi.cluster.protocol.NodeProtocolSender; import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.leader.election.LeaderElectionManager; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.util.NiFiProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.FactoryBean; import org.springframework.context.ApplicationContext; @@ -40,8 +39,6 @@ import org.springframework.context.ApplicationContextAware; @SuppressWarnings("rawtypes") public class FlowControllerFactoryBean implements FactoryBean, ApplicationContextAware { - private static final Logger LOG = LoggerFactory.getLogger(FlowControllerFactoryBean.class); - private ApplicationContext applicationContext; private FlowController flowController; private NiFiProperties properties; @@ -51,6 +48,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex private BulletinRepository bulletinRepository; private ClusterCoordinator clusterCoordinator; private VariableRegistry variableRegistry; + private LeaderElectionManager leaderElectionManager; @Override public Object getObject() throws Exception { @@ -69,7 +67,9 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex nodeProtocolSender, bulletinRepository, clusterCoordinator, - heartbeatMonitor, variableRegistry); + heartbeatMonitor, + leaderElectionManager, + variableRegistry); } else { flowController = FlowController.createStandaloneInstance( flowFileEventRepository, @@ -129,4 +129,8 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) { this.clusterCoordinator = clusterCoordinator; } + + public void setLeaderElectionManager(final LeaderElectionManager leaderElectionManager) { + this.leaderElectionManager = leaderElectionManager; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/LeaderElectionManagerFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/LeaderElectionManagerFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/LeaderElectionManagerFactoryBean.java new file mode 100644 index 0000000..f17cf1b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/LeaderElectionManagerFactoryBean.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.spring; + +import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager; +import org.apache.nifi.controller.leader.election.LeaderElectionManager; +import org.apache.nifi.controller.leader.election.StandaloneLeaderElectionManager; +import org.apache.nifi.util.NiFiProperties; +import org.springframework.beans.factory.FactoryBean; + +public class LeaderElectionManagerFactoryBean implements FactoryBean<LeaderElectionManager> { + private int numThreads; + private NiFiProperties properties; + + @Override + public LeaderElectionManager getObject() throws Exception { + final boolean isNode = properties.isNode(); + if (isNode) { + return new CuratorLeaderElectionManager(numThreads, properties); + } else { + return new StandaloneLeaderElectionManager(); + } + } + + @Override + public Class<?> getObjectType() { + return LeaderElectionManager.class; + } + + @Override + public boolean isSingleton() { + return true; + } + + public void setNumThreads(final int numThreads) { + this.numThreads = numThreads; + } + + public void setProperties(final NiFiProperties properties) { + this.properties = properties; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml index f03e26c..1503208 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml @@ -47,6 +47,7 @@ <property name="bulletinRepository" ref="bulletinRepository" /> <property name="clusterCoordinator" ref="clusterCoordinator" /> <property name="variableRegistry" ref="variableRegistry"/> + <property name="leaderElectionManager" ref="leaderElectionManager" /> </bean> <!-- flow service --> http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java index af73eef..429a791 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import org.apache.nifi.cluster.protocol.HeartbeatPayload; import org.apache.nifi.util.NiFiProperties; import org.junit.Before; import org.junit.BeforeClass; http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java index b25c90b..cbb96b1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java @@ -18,10 +18,14 @@ package org.apache.nifi.nar; import java.io.Closeable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * */ public class NarCloseable implements Closeable { + private static final Logger logger = LoggerFactory.getLogger(NarCloseable.class); public static NarCloseable withNarLoader() { final ClassLoader current = Thread.currentThread().getContextClassLoader(); @@ -29,6 +33,31 @@ public class NarCloseable implements Closeable { return new NarCloseable(current); } + /** + * Creates a Closeable object that can be used to to switch to current class loader to the framework class loader + * and will automatically set the ClassLoader back to the previous class loader when closed + * + * @return a NarCloseable + */ + public static NarCloseable withFrameworkNar() { + final ClassLoader frameworkClassLoader; + try { + frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkClassLoader(); + } catch (final Exception e) { + // This should never happen in a running instance, but it will occur in unit tests + logger.error("Unable to access Framework ClassLoader due to " + e + ". Will continue without change ClassLoaders."); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + + return new NarCloseable(null); + } + + final ClassLoader current = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(frameworkClassLoader); + return new NarCloseable(current); + } + private final ClassLoader toSet; private NarCloseable(final ClassLoader toSet) { http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index d336b51..2b269a7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -42,6 +42,7 @@ import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor; import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat; +import org.apache.nifi.cluster.coordination.node.ClusterRoles; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; @@ -65,6 +66,7 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.Template; import org.apache.nifi.controller.label.Label; +import org.apache.nifi.controller.leader.election.LeaderElectionManager; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceReference; @@ -261,6 +263,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private AccessPolicyDAO accessPolicyDAO; private ClusterCoordinator clusterCoordinator; private HeartbeatMonitor heartbeatMonitor; + private LeaderElectionManager leaderElectionManager; // administrative services private AuditService auditService; @@ -3116,19 +3119,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { clusterDto.setGenerated(new Date()); // create node dtos - final Collection<NodeDTO> nodeDtos = new ArrayList<>(); + final List<NodeDTO> nodeDtos = clusterCoordinator.getNodeIdentifiers().stream() + .map(nodeId -> getNode(nodeId)) + .collect(Collectors.toList()); clusterDto.setNodes(nodeDtos); - for (final NodeIdentifier nodeId : clusterCoordinator.getNodeIdentifiers()) { - final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId); - if (status == null) { - continue; - } - - final List<NodeEvent> events = clusterCoordinator.getNodeEvents(nodeId); - final Set<String> nodeRoles = clusterCoordinator.getConnectionStatus(nodeId).getRoles(); - final NodeHeartbeat heartbeat = heartbeatMonitor.getLatestHeartbeat(nodeId); - nodeDtos.add(dtoFactory.createNodeDTO(nodeId, status, heartbeat, events, nodeRoles)); - } return clusterDto; } @@ -3142,11 +3136,29 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private NodeDTO getNode(final NodeIdentifier nodeId) { final NodeConnectionStatus nodeStatus = clusterCoordinator.getConnectionStatus(nodeId); final List<NodeEvent> events = clusterCoordinator.getNodeEvents(nodeId); - final Set<String> roles = clusterCoordinator.getConnectionStatus(nodeId).getRoles(); + final Set<String> roles = getRoles(nodeId); final NodeHeartbeat heartbeat = heartbeatMonitor.getLatestHeartbeat(nodeId); return dtoFactory.createNodeDTO(nodeId, nodeStatus, heartbeat, events, roles); } + private Set<String> getRoles(final NodeIdentifier nodeId) { + final Set<String> roles = new HashSet<>(); + final String nodeAddress = nodeId.getSocketAddress() + ":" + nodeId.getSocketPort(); + + for (final String roleName : ClusterRoles.getAllRoles()) { + final String leader = leaderElectionManager.getLeader(roleName); + if (leader == null) { + continue; + } + + if (leader.equals(nodeAddress)) { + roles.add(roleName); + } + } + + return roles; + } + @Override public void deleteNode(final String nodeId) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); @@ -3290,4 +3302,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { public void setBulletinRepository(final BulletinRepository bulletinRepository) { this.bulletinRepository = bulletinRepository; } + + public void setLeaderElectionManager(final LeaderElectionManager leaderElectionManager) { + this.leaderElectionManager = leaderElectionManager; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml index 28dbb62..614043a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml @@ -163,6 +163,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator"/> <property name="heartbeatMonitor" ref="heartbeatMonitor" /> <property name="bulletinRepository" ref="bulletinRepository"/> + <property name="leaderElectionManager" ref="leaderElectionManager" /> </bean> <!-- component ui extension configuration context -->
