http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java deleted file mode 100644 index f9dfb00..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.event; - -import java.util.List; - -/** - * Manages an ordered list of events. The event history size dictates the total - * number of events to manage for a given source at a given time. When the size - * is exceeded, the oldest event for that source is evicted. - * - * @author unattributed - */ -public interface EventManager { - - /** - * Adds an event to the manager. - * - * @param event an Event - */ - void addEvent(Event event); - - /** - * Returns a list of events for a given source sorted by the event's - * timestamp where the most recent event is first in the list. - * - * @param eventSource the source - * - * @return the list of events - */ - List<Event> getEvents(String eventSource); - - /* - * Returns the most recent event for the source. If no events exist, then - * null is returned. - */ - Event getMostRecentEvent(String eventSource); - - /* - * Clears all events for the given source. - */ - void clearEventHistory(String eventSource); - - /** - * Returns the history size. - * - * @return the history size - */ - int getEventHistorySize(); - -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java deleted file mode 100644 index 7fadc78..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.event.impl; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.Queue; -import org.apache.nifi.cluster.event.Event; -import org.apache.nifi.cluster.event.EventManager; - -/** - * Implements the EventManager. - * - * @author unattributed - */ -public class EventManagerImpl implements EventManager { - - /** - * associates the source ID with an ordered queue of events, ordered by most - * recent event - */ - private final Map<String, Queue<Event>> eventsMap = new HashMap<>(); - - /** - * the number of events to maintain for a given source - */ - private final int eventHistorySize; - - /** - * Creates an instance. - * - * @param eventHistorySize the number of events to manage for a given - * source. Value must be positive. - */ - public EventManagerImpl(final int eventHistorySize) { - if (eventHistorySize <= 0) { - throw new IllegalArgumentException("Event history size must be positive: " + eventHistorySize); - } - this.eventHistorySize = eventHistorySize; - } - - @Override - public void addEvent(final Event event) { - - if (event == null) { - throw new IllegalArgumentException("Event may not be null."); - } - - Queue<Event> events = eventsMap.get(event.getSource()); - if (events == null) { - // no events from this source, so add a new queue to the map - events = new PriorityQueue<>(eventHistorySize, createEventComparator()); - eventsMap.put(event.getSource(), events); - } - - // add event - events.add(event); - - // if we exceeded the history size, then evict the oldest event - if (events.size() > eventHistorySize) { - removeOldestEvent(events); - } - - } - - @Override - public List<Event> getEvents(final String eventSource) { - final Queue<Event> events = eventsMap.get(eventSource); - if (events == null) { - return Collections.EMPTY_LIST; - } else { - return Collections.unmodifiableList(new ArrayList<>(events)); - } - } - - @Override - public int getEventHistorySize() { - return eventHistorySize; - } - - @Override - public Event getMostRecentEvent(final String eventSource) { - final Queue<Event> events = eventsMap.get(eventSource); - if (events == null) { - return null; - } else { - return events.peek(); - } - } - - @Override - public void clearEventHistory(final String eventSource) { - eventsMap.remove(eventSource); - } - - private Comparator createEventComparator() { - return new Comparator<Event>() { - @Override - public int compare(final Event o1, final Event o2) { - // orders events by most recent first - return (int) (o2.getTimestamp() - o1.getTimestamp()); - } - }; - } - - private void removeOldestEvent(final Collection<Event> events) { - - if (events.isEmpty()) { - return; - } - - Event oldestEvent = null; - for (final Event event : events) { - if (oldestEvent == null || oldestEvent.getTimestamp() > event.getTimestamp()) { - oldestEvent = event; - } - } - - events.remove(oldestEvent); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java deleted file mode 100644 index 2e3d278..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.firewall; - -/** - * Defines the interface for restricting external client connections to a set of - * hosts or IPs. - */ -public interface ClusterNodeFirewall { - - /** - * Returns true if the given host or IP is permissible through the firewall; - * false otherwise. - * - * If an IP is given, then it must be formatted in dotted decimal notation. - * @param hostOrIp - * @return - */ - boolean isPermissible(String hostOrIp); - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java deleted file mode 100644 index 916ec14..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.firewall.impl; - -import java.io.*; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.*; -import org.apache.commons.net.util.SubnetUtils; -import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; -import org.apache.nifi.util.file.FileUtils; -import org.apache.nifi.logging.NiFiLog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A file-based implementation of the ClusterFirewall interface. The class is - * configured with a file. If the file is empty, then everything is permissible. - * Otherwise, the file should contain hostnames or IPs formatted as dotted - * decimals with an optional CIDR suffix. Each entry must be separated by a - * newline. An example configuration is given below: - * - * <code> - * # hash character is a comment delimiter - * 1.2.3.4 # exact IP - * some.host.name # a host name - * 4.5.6.7/8 # range of CIDR IPs - * 9.10.11.12/13 # a smaller range of CIDR IPs - * </code> - * - * This class allows for synchronization with an optionally configured restore - * directory. If configured, then at startup, if the either the config file or - * the restore directory's copy is missing, then the configuration file will be - * copied to the appropriate location. If both restore directory contains a copy - * that is different in content to configuration file, then an exception is - * thrown at construction time. - */ -public class FileBasedClusterNodeFirewall implements ClusterNodeFirewall { - - private final File config; - - private final File restoreDirectory; - - private final Collection<SubnetUtils.SubnetInfo> subnetInfos = new ArrayList<>(); - - private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(FileBasedClusterNodeFirewall.class)); - - public FileBasedClusterNodeFirewall(final File config) throws IOException { - this(config, null); - } - - public FileBasedClusterNodeFirewall(final File config, final File restoreDirectory) throws IOException { - - if (config == null) { - throw new IllegalArgumentException("Firewall configuration file may not be null."); - } - - this.config = config; - this.restoreDirectory = restoreDirectory; - - if (restoreDirectory != null) { - // synchronize with restore directory - try { - syncWithRestoreDirectory(); - } catch (final IOException ioe) { - throw new RuntimeException(ioe); - } - } - - if (!config.exists() && !config.createNewFile()) { - throw new IOException("Firewall configuration file did not exist and could not be created: " + config.getAbsolutePath()); - } - - logger.info("Loading cluster firewall configuration."); - parseConfig(config); - logger.info("Cluster firewall configuration loaded."); - } - - @Override - public boolean isPermissible(final String hostOrIp) { - try { - - // if no rules, then permit everything - if (subnetInfos.isEmpty()) { - return true; - } - - final String ip; - try { - ip = InetAddress.getByName(hostOrIp).getHostAddress(); - } catch (final UnknownHostException uhe) { - logger.warn("Blocking unknown host: " + hostOrIp, uhe); - return false; - } - - // check each subnet to see if IP is in range - for (final SubnetUtils.SubnetInfo subnetInfo : subnetInfos) { - if (subnetInfo.isInRange(ip)) { - return true; - } - } - - // no match - return false; - - } catch (final IllegalArgumentException iae) { - return false; - } - } - - private void syncWithRestoreDirectory() throws IOException { - - // sanity check that restore directory is a directory, creating it if necessary - FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory); - - // check that restore directory is not the same as the primary directory - if (config.getParentFile().getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) { - throw new IllegalStateException( - String.format("Cluster firewall configuration file '%s' cannot be in the restore directory '%s' ", - config.getAbsolutePath(), restoreDirectory.getAbsolutePath())); - } - - // the restore copy will have same file name, but reside in a different directory - final File restoreFile = new File(restoreDirectory, config.getName()); - - // sync the primary copy with the restore copy - FileUtils.syncWithRestore(config, restoreFile, logger); - - } - - private void parseConfig(final File config) throws IOException { - - // clear old information - subnetInfos.clear(); - try (BufferedReader br = new BufferedReader(new FileReader(config))) { - - String ipOrHostLine; - String ipCidr; - int totalIpsAdded = 0; - while ((ipOrHostLine = br.readLine()) != null) { - - // cleanup whitespace - ipOrHostLine = ipOrHostLine.trim(); - - if (ipOrHostLine.isEmpty() || ipOrHostLine.startsWith("#")) { - // skip empty lines or comments - continue; - } else if (ipOrHostLine.contains("#")) { - // parse out comments in IP containing lines - ipOrHostLine = ipOrHostLine.substring(0, ipOrHostLine.indexOf("#")).trim(); - } - - // if given a complete IP, then covert to CIDR - if (ipOrHostLine.contains("/")) { - ipCidr = ipOrHostLine; - } else if (ipOrHostLine.contains("\\")) { - logger.warn("CIDR IP notation uses forward slashes '/'. Replacing backslash '\\' with forward slash'/' for '" + ipOrHostLine + "'"); - ipCidr = ipOrHostLine.replace("\\", "/"); - } else { - try { - ipCidr = InetAddress.getByName(ipOrHostLine).getHostAddress(); - if (!ipOrHostLine.equals(ipCidr)) { - logger.debug(String.format("Resolved host '%s' to ip '%s'", ipOrHostLine, ipCidr)); - } - ipCidr += "/32"; - logger.debug("Adding CIDR to exact IP: " + ipCidr); - } catch (final UnknownHostException uhe) { - logger.warn("Firewall is skipping unknown host address: " + ipOrHostLine); - continue; - } - } - - try { - logger.debug("Adding CIDR IP to firewall: " + ipCidr); - final SubnetUtils subnetUtils = new SubnetUtils(ipCidr); - subnetUtils.setInclusiveHostCount(true); - subnetInfos.add(subnetUtils.getInfo()); - totalIpsAdded++; - } catch (final IllegalArgumentException iae) { - logger.warn("Firewall is skipping invalid CIDR address: " + ipOrHostLine); - } - - } - - if (totalIpsAdded == 0) { - logger.info("No IPs added to firewall. Firewall will accept all requests."); - } else { - logger.info(String.format("Added %d IP(s) to firewall. Only requests originating from the configured IPs will be accepted.", totalIpsAdded)); - } - - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java deleted file mode 100644 index eedb88f..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.flow; - -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.StandardDataFlow; - -/** - * A dataflow with additional information about the cluster. - * - * @author unattributed - */ -public class ClusterDataFlow { - - private final StandardDataFlow dataFlow; - - private final NodeIdentifier primaryNodeId; - - public ClusterDataFlow(final StandardDataFlow dataFlow, final NodeIdentifier primaryNodeId) { - this.dataFlow = dataFlow; - this.primaryNodeId = primaryNodeId; - } - - public NodeIdentifier getPrimaryNodeId() { - return primaryNodeId; - } - - public StandardDataFlow getDataFlow() { - return dataFlow; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java deleted file mode 100644 index 6ff15a7..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.flow; - -/** - * A base exception for data access exceptions. - * - * @author unattributed - */ -public class DaoException extends RuntimeException { - - public DaoException() { - } - - public DaoException(String msg) { - super(msg); - } - - public DaoException(Throwable cause) { - super(cause); - } - - public DaoException(String msg, Throwable cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java deleted file mode 100644 index a273704..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.flow; - -/** - * A data access object for loading and saving the flow managed by the cluster. - * - * @author unattributed - */ -public interface DataFlowDao { - - /** - * Loads the cluster's dataflow. - * - * @return the dataflow or null if no dataflow exists - * - * @throws DaoException if the dataflow was unable to be loaded - */ - ClusterDataFlow loadDataFlow() throws DaoException; - - /** - * Saves the cluster's dataflow. - * - * - * @param dataFlow - * @throws DaoException if the dataflow was unable to be saved - */ - void saveDataFlow(ClusterDataFlow dataFlow) throws DaoException; - - /** - * Sets the state of the dataflow. If the dataflow does not exist, then an - * exception is thrown. - * - * @param flowState the state of the dataflow - * - * @throws DaoException if the state was unable to be updated - */ - void setPersistedFlowState(PersistedFlowState flowState) throws DaoException; - - /** - * Gets the state of the dataflow. - * - * @return the state of the dataflow - * - * @throws DaoException if the state was unable to be retrieved - */ - PersistedFlowState getPersistedFlowState() throws DaoException; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java deleted file mode 100644 index 339d904..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.flow; - -import java.util.Set; -import org.apache.nifi.cluster.protocol.NodeIdentifier; - -/** - * A service for managing the cluster's flow. The service will attempt to keep - * the cluster's dataflow current while respecting the value of the configured - * retrieval delay. - * - * The eligible retrieval time is reset with the configured delay every time the - * flow state is set to STALE. If the state is set to UNKNOWN or CURRENT, then - * the flow will not be retrieved. - * - * Clients must call start() and stop() to initialize and stop the instance. - * - * @author unattributed - */ -public interface DataFlowManagementService { - - /** - * Starts the instance. Start may only be called if the instance is not - * running. - */ - void start(); - - /** - * Stops the instance. Stop may only be called if the instance is running. - */ - void stop(); - - /** - * @return true if the instance is started; false otherwise. - */ - boolean isRunning(); - - /** - * Loads the dataflow. - * - * @return the dataflow or null if no dataflow exists - */ - ClusterDataFlow loadDataFlow(); - - /** - * Updates the dataflow with the given primary node identifier. - * - * @param nodeId the node identifier - * - * @throws DaoException if the update failed - */ - void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException; - - /** - * Sets the state of the flow. - * - * @param flowState the state - * - * @see PersistedFlowState - */ - void setPersistedFlowState(PersistedFlowState flowState); - - /** - * @return the state of the flow - */ - PersistedFlowState getPersistedFlowState(); - - /** - * @return true if the flow is current; false otherwise. - */ - boolean isFlowCurrent(); - - /** - * Sets the node identifiers to use when attempting to retrieve the flow. - * - * @param nodeIds the node identifiers - */ - void setNodeIds(Set<NodeIdentifier> nodeIds); - - /** - * Returns the set of node identifiers the service is using to retrieve the - * flow. - * - * @return the set of node identifiers the service is using to retrieve the - * flow. - */ - Set<NodeIdentifier> getNodeIds(); - - /** - * @return the retrieval delay in seconds - */ - int getRetrievalDelaySeconds(); - - /** - * Sets the retrieval delay. - * - * @param delay the retrieval delay in seconds - */ - void setRetrievalDelay(String delay); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java deleted file mode 100644 index b3afc6e..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.flow; - -/** - * Represents the various state of a flow managed by the cluster. - * - * The semantics of the values are: - * <ul> - * <li> CURRENT - the flow is current </li> - * <li> STALE - the flow is not current, but is eligible to be updated. </li> - * <li> UNKNOWN - the flow is not current and is not eligible to be updated. - * </li> - * </ul> - * - * @author unattributed - */ -public enum PersistedFlowState { - - CURRENT, - STALE, - UNKNOWN -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java deleted file mode 100644 index ce5a08b..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.flow; - -/** - * Represents the exceptional case when a caller is requesting the current flow, - * but a current flow is not available. - * - * @author unattributed - */ -public class StaleFlowException extends RuntimeException { - - public StaleFlowException(String message, Throwable cause) { - super(message, cause); - } - - public StaleFlowException(String message) { - super(message); - } - - public StaleFlowException(Throwable cause) { - super(cause); - } - - public StaleFlowException() { - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java deleted file mode 100644 index 72b594a..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java +++ /dev/null @@ -1,600 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.flow.impl; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.Arrays; -import java.util.UUID; - -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import javax.xml.bind.Unmarshaller; -import javax.xml.bind.annotation.XmlRootElement; -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.transform.OutputKeys; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerFactory; -import javax.xml.transform.dom.DOMSource; -import javax.xml.transform.stream.StreamResult; - -import org.apache.commons.compress.archivers.ArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; -import org.apache.nifi.cluster.flow.ClusterDataFlow; -import org.apache.nifi.cluster.flow.DaoException; -import org.apache.nifi.cluster.flow.DataFlowDao; -import org.apache.nifi.cluster.flow.PersistedFlowState; -import org.apache.nifi.cluster.protocol.DataFlow; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.StandardDataFlow; -import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter; -import org.apache.nifi.logging.NiFiLog; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.stream.io.ByteArrayInputStream; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.util.file.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.w3c.dom.Document; -import org.w3c.dom.Element; - -/** - * Implements the FlowDao interface. The implementation tracks the state of the - * dataflow by annotating the filename of the flow state file. Specifically, the - * implementation correlates PersistedFlowState states to filename extensions. - * The correlation is as follows: - * <ul> - * <li> CURRENT maps to flow.xml </li> - * <li> STALE maps to flow.xml.stale </li> - * <li> UNKNOWN maps to flow.xml.unknown </li> - * </ul> - * Whenever the flow state changes, the flow state file's name is updated to - * denote its state. - * - * The implementation also provides for a restore directory that may be - * configured for higher availability. At instance creation, if the primary or - * restore directories have multiple flow state files, an exception is thrown. - * If the primary directory has a current flow state file, but the restore - * directory does not, then the primary flow state file is copied to the restore - * directory. If the restore directory has a current flow state file, but the - * primary directory does not, then the restore flow state file is copied to the - * primary directory. If both the primary and restore directories have a current - * flow state file and the files are different, then an exception is thrown. - * - * When the flow state file is saved, it is always saved first to the restore - * directory followed by a save to the primary directory. When the flow state - * file is loaded, a check is made to verify that the primary and restore flow - * state files are both current. If either is not current, then an exception is - * thrown. The primary flow state file is always read when the load method is - * called. - * - * @author unattributed - */ -public class DataFlowDaoImpl implements DataFlowDao { - - private final File primaryDirectory; - private final File restoreDirectory; - private final boolean autoStart; - private final String generatedRootGroupId = UUID.randomUUID().toString(); - - public static final String STALE_EXT = ".stale"; - public static final String UNKNOWN_EXT = ".unknown"; - public static final String FLOW_PACKAGE = "flow.tar"; - public static final String FLOW_XML_FILENAME = "flow.xml"; - public static final String TEMPLATES_FILENAME = "templates.xml"; - public static final String SNIPPETS_FILENAME = "snippets.xml"; - public static final String CLUSTER_INFO_FILENAME = "cluster-info.xml"; - - private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowDaoImpl.class)); - - public DataFlowDaoImpl(final File primaryDirectory) throws DaoException { - this(primaryDirectory, null, false); - } - - public DataFlowDaoImpl(final File primaryDirectory, final File restoreDirectory, final boolean autoStart) throws DaoException { - - // sanity check that primary directory is a directory, creating it if necessary - if (primaryDirectory == null) { - throw new IllegalArgumentException("Primary directory may not be null."); - } else if (!primaryDirectory.exists()) { - if (!primaryDirectory.mkdir()) { - throw new DaoException(String.format("Failed to create primary directory '%s'", primaryDirectory.getAbsolutePath())); - } - } else if (!primaryDirectory.isDirectory()) { - throw new IllegalArgumentException("Primary directory must be a directory."); - } - - this.autoStart = autoStart; - - try { - this.primaryDirectory = primaryDirectory; - this.restoreDirectory = restoreDirectory; - - if (restoreDirectory == null) { - // check that we have exactly one current flow state file - ensureSingleCurrentStateFile(primaryDirectory); - } else { - - // check that restore directory is a directory, creating it if necessary - FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory); - - // check that restore directory is not the same as the primary directory - if (primaryDirectory.getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) { - throw new IllegalArgumentException(String.format("Primary directory '%s' is the same as restore directory '%s' ", - primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath())); - } - - final File[] primaryFlowStateFiles = getFlowStateFiles(primaryDirectory); - final File[] restoreFlowStateFiles = getFlowStateFiles(restoreDirectory); - - // if more than one state file in either primary or restore, then throw exception - if (primaryFlowStateFiles.length > 1) { - throw new IllegalStateException(String.format("Found multiple dataflow state files in primary directory '%s'", primaryDirectory)); - } else if (restoreFlowStateFiles.length > 1) { - throw new IllegalStateException(String.format("Found multiple dataflow state files in restore directory '%s'", restoreDirectory)); - } - - // check that the single primary state file we found is current or create a new one - final File primaryFlowStateFile = ensureSingleCurrentStateFile(primaryDirectory); - - // check that the single restore state file we found is current or create a new one - final File restoreFlowStateFile = ensureSingleCurrentStateFile(restoreDirectory); - - // if there was a difference in flow state file directories, then copy the appropriate files - if (restoreFlowStateFiles.length == 0 && primaryFlowStateFiles.length != 0) { - // copy primary state file to restore - FileUtils.copyFile(primaryFlowStateFile, restoreFlowStateFile, false, false, logger); - } else if (primaryFlowStateFiles.length == 0 && restoreFlowStateFiles.length != 0) { - // copy restore state file to primary - FileUtils.copyFile(restoreFlowStateFile, primaryFlowStateFile, false, false, logger); - } else { - // sync the primary copy with the restore copy - syncWithRestore(primaryFlowStateFile, restoreFlowStateFile); - } - - } - } catch (final IOException | IllegalArgumentException | IllegalStateException | JAXBException ex) { - throw new DaoException(ex); - } - } - - - private void syncWithRestore(final File primaryFile, final File restoreFile) throws IOException { - try (final FileInputStream primaryFis = new FileInputStream(primaryFile); - final TarArchiveInputStream primaryIn = new TarArchiveInputStream(primaryFis); - final FileInputStream restoreFis = new FileInputStream(restoreFile); - final TarArchiveInputStream restoreIn = new TarArchiveInputStream(restoreFis)) { - - final ArchiveEntry primaryEntry = primaryIn.getNextEntry(); - final ArchiveEntry restoreEntry = restoreIn.getNextEntry(); - - if ( primaryEntry == null && restoreEntry == null ) { - return; - } - - if ( (primaryEntry == null && restoreEntry != null) || (primaryEntry != null && restoreEntry == null) ) { - throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'", - primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath())); - } - - final byte[] primaryMd5 = calculateMd5(primaryIn); - final byte[] restoreMd5 = calculateMd5(restoreIn); - - if ( !Arrays.equals(primaryMd5, restoreMd5) ) { - throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'", - primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath())); - } - } - } - - private byte[] calculateMd5(final InputStream in) throws IOException { - final MessageDigest digest; - try { - digest = MessageDigest.getInstance("MD5"); - } catch (final NoSuchAlgorithmException nsae) { - throw new IOException(nsae); - } - - int len; - final byte[] buffer = new byte[8192]; - while ((len = in.read(buffer)) > -1) { - if (len > 0) { - digest.update(buffer, 0, len); - } - } - return digest.digest(); - } - - @Override - public ClusterDataFlow loadDataFlow() throws DaoException { - try { - return parseDataFlow(getExistingFlowStateFile(primaryDirectory)); - } catch (final IOException | JAXBException ex) { - throw new DaoException(ex); - } - } - - @Override - public void saveDataFlow(final ClusterDataFlow dataFlow) throws DaoException { - try { - - final File primaryStateFile = getFlowStateFile(primaryDirectory); - - // write to restore before writing to primary in case primary experiences problems - if (restoreDirectory != null) { - final File restoreStateFile = getFlowStateFile(restoreDirectory); - if (restoreStateFile == null) { - if (primaryStateFile == null) { - writeDataFlow(createNewFlowStateFile(restoreDirectory), dataFlow); - } else { - throw new DaoException(String.format("Unable to save dataflow because dataflow state file in primary directory '%s' exists, but it does not exist in the restore directory '%s'", - primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath())); - } - } else { - if (primaryStateFile == null) { - throw new DaoException(String.format("Unable to save dataflow because dataflow state file in restore directory '%s' exists, but it does not exist in the primary directory '%s'", - restoreDirectory.getAbsolutePath(), primaryDirectory.getAbsolutePath())); - } else { - final PersistedFlowState primaryFlowState = getPersistedFlowState(primaryStateFile); - final PersistedFlowState restoreFlowState = getPersistedFlowState(restoreStateFile); - if (primaryFlowState == restoreFlowState) { - writeDataFlow(restoreStateFile, dataFlow); - } else { - throw new DaoException(String.format("Unable to save dataflow because state file in primary directory '%s' has state '%s', but the state file in the restore directory '%s' has state '%s'", - primaryDirectory.getAbsolutePath(), primaryFlowState, restoreDirectory.getAbsolutePath(), restoreFlowState)); - } - } - } - } - - // write dataflow to primary - if (primaryStateFile == null) { - writeDataFlow(createNewFlowStateFile(primaryDirectory), dataFlow); - } else { - writeDataFlow(primaryStateFile, dataFlow); - } - - } catch (final IOException | JAXBException ex) { - throw new DaoException(ex); - } - } - - @Override - public PersistedFlowState getPersistedFlowState() { - // trust restore over primary if configured for restore - if (restoreDirectory == null) { - return getPersistedFlowState(getExistingFlowStateFile(primaryDirectory)); - } else { - return getPersistedFlowState(getExistingFlowStateFile(restoreDirectory)); - } - } - - @Override - public void setPersistedFlowState(final PersistedFlowState flowState) throws DaoException { - // rename restore before primary if configured for restore - if (restoreDirectory != null) { - renameFlowStateFile(getExistingFlowStateFile(restoreDirectory), flowState); - } - renameFlowStateFile(getExistingFlowStateFile(primaryDirectory), flowState); - } - - private File ensureSingleCurrentStateFile(final File dir) throws IOException, JAXBException { - - // ensure that we have at most one state file and if we have one, it is current - final File[] directoryFlowStateFiles = getFlowStateFiles(dir); - if (directoryFlowStateFiles.length > 1) { - throw new DaoException(String.format("Found multiple dataflow state files in directory '%s'", dir)); - } else if (directoryFlowStateFiles.length == 0) { - // create a new file if none exist - return createNewFlowStateFile(dir); - } else { - // check that the single flow state file is current - final PersistedFlowState flowState = getPersistedFlowState(directoryFlowStateFiles[0]); - if (PersistedFlowState.CURRENT == flowState) { - return directoryFlowStateFiles[0]; - } else { - throw new DaoException(String.format("Dataflow state file '%s' must be current.", directoryFlowStateFiles[0].getAbsolutePath())); - } - } - - } - - private PersistedFlowState getPersistedFlowState(final File file) { - final String path = file.getAbsolutePath(); - if (path.endsWith(STALE_EXT)) { - return PersistedFlowState.STALE; - } else if (path.endsWith(UNKNOWN_EXT)) { - return PersistedFlowState.UNKNOWN; - } else { - return PersistedFlowState.CURRENT; - } - } - - private File getFlowStateFile(final File dir) { - final File[] files = getFlowStateFiles(dir); - if (files.length > 1) { - throw new IllegalStateException(String.format("Expected at most one dataflow state file, but found %s files.", files.length)); - } else if (files.length == 0) { - return null; - } else { - return files[0]; - } - } - - private File getExistingFlowStateFile(final File dir) { - final File file = getFlowStateFile(dir); - if (file == null) { - throw new IllegalStateException(String.format("Expected a dataflow state file, but none existed in directory '%s'", dir.getAbsolutePath())); - } - return file; - } - - private File[] getFlowStateFiles(final File dir) { - final File[] files = dir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return (name.equals(FLOW_PACKAGE) || name.endsWith(STALE_EXT) || name.endsWith(UNKNOWN_EXT)); - } - }); - - if (files == null) { - return new File[0]; - } else { - return files; - } - } - - private File removeStateFileExtension(final File file) { - - final String path = file.getAbsolutePath(); - final int stateFileExtIndex; - if (path.endsWith(STALE_EXT)) { - stateFileExtIndex = path.lastIndexOf(STALE_EXT); - } else if (path.endsWith(UNKNOWN_EXT)) { - stateFileExtIndex = path.lastIndexOf(UNKNOWN_EXT); - } else { - stateFileExtIndex = path.length(); - } - - return new File(path.substring(0, stateFileExtIndex)); - } - - private File addStateFileExtension(final File file, final PersistedFlowState state) { - switch (state) { - case CURRENT: { - return file; - } - case STALE: { - return new File(file.getAbsolutePath() + STALE_EXT); - } - case UNKNOWN: { - return new File(file.getAbsolutePath() + UNKNOWN_EXT); - } - default: { - throw new RuntimeException("Unsupported PersistedFlowState Enum value: " + state); - } - } - } - - private File createNewFlowStateFile(final File dir) throws IOException, JAXBException { - final File stateFile = new File(dir, FLOW_PACKAGE); - stateFile.createNewFile(); - - final byte[] flowBytes = getEmptyFlowBytes(); - final byte[] templateBytes = new byte[0]; - final byte[] snippetBytes = new byte[0]; - final DataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes); - - final ClusterMetadata clusterMetadata = new ClusterMetadata(); - writeDataFlow(stateFile, dataFlow, clusterMetadata); - - return stateFile; - } - - private byte[] getEmptyFlowBytes() throws IOException { - try { - final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder(); - final Document document = docBuilder.newDocument(); - - final Element controller = document.createElement("flowController"); - document.appendChild(controller); - - controller.appendChild(createTextElement(document, "maxThreadCount", "15")); - - final Element rootGroup = document.createElement("rootGroup"); - rootGroup.appendChild(createTextElement(document, "id", generatedRootGroupId)); - rootGroup.appendChild(createTextElement(document, "name", "NiFi Flow")); - - // create the position element - final Element positionElement = createTextElement(document, "position", ""); - positionElement.setAttribute("x", "0.0"); - positionElement.setAttribute("y", "0.0"); - rootGroup.appendChild(positionElement); - - rootGroup.appendChild(createTextElement(document, "comment", "")); - controller.appendChild(rootGroup); - - final Transformer transformer = TransformerFactory.newInstance().newTransformer(); - transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2"); - transformer.setOutputProperty(OutputKeys.INDENT, "yes"); - - final DOMSource source = new DOMSource(document); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final StreamResult result = new StreamResult(baos); - transformer.transform(source, result); - - return baos.toByteArray(); - } catch (final Exception e) { - throw new IOException(e); - } - } - - private Element createTextElement(final Document document, final String elementName, final String value) { - final Element element = document.createElement(elementName); - element.setTextContent(value); - return element; - } - - private void renameFlowStateFile(final File flowStateFile, final PersistedFlowState newState) throws DaoException { - final PersistedFlowState existingState = getPersistedFlowState(flowStateFile); - if (existingState != newState) { - final File newFlowStateFile = addStateFileExtension(removeStateFileExtension(flowStateFile), newState); - if (flowStateFile.renameTo(newFlowStateFile) == false) { - throw new DaoException( - String.format("Failed to rename flow state file '%s' to new name '%s'", flowStateFile.getAbsolutePath(), newFlowStateFile.getAbsolutePath())); - } - } - } - - private ClusterDataFlow parseDataFlow(final File file) throws IOException, JAXBException, DaoException { - byte[] flowBytes = new byte[0]; - byte[] templateBytes = new byte[0]; - byte[] snippetBytes = new byte[0]; - byte[] clusterInfoBytes = new byte[0]; - - try (final InputStream inStream = new FileInputStream(file); - final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inStream))) { - TarArchiveEntry tarEntry; - while ((tarEntry = tarIn.getNextTarEntry()) != null) { - switch (tarEntry.getName()) { - case FLOW_XML_FILENAME: - flowBytes = new byte[(int) tarEntry.getSize()]; - StreamUtils.fillBuffer(tarIn, flowBytes, true); - break; - case TEMPLATES_FILENAME: - templateBytes = new byte[(int) tarEntry.getSize()]; - StreamUtils.fillBuffer(tarIn, templateBytes, true); - break; - case SNIPPETS_FILENAME: - snippetBytes = new byte[(int) tarEntry.getSize()]; - StreamUtils.fillBuffer(tarIn, snippetBytes, true); - break; - case CLUSTER_INFO_FILENAME: - clusterInfoBytes = new byte[(int) tarEntry.getSize()]; - StreamUtils.fillBuffer(tarIn, clusterInfoBytes, true); - break; - default: - throw new DaoException("Found Unexpected file in dataflow configuration: " + tarEntry.getName()); - } - } - } - - final ClusterMetadata clusterMetadata; - if (clusterInfoBytes.length == 0) { - clusterMetadata = null; - } else { - final Unmarshaller clusterMetadataUnmarshaller = ClusterMetadata.jaxbCtx.createUnmarshaller(); - clusterMetadata = (ClusterMetadata) clusterMetadataUnmarshaller.unmarshal(new ByteArrayInputStream(clusterInfoBytes)); - } - - final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes); - dataFlow.setAutoStartProcessors(autoStart); - - return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId()); - } - - private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow) throws IOException, JAXBException { - - // get the data flow - DataFlow dataFlow = clusterDataFlow.getDataFlow(); - - // if no dataflow, then write a new dataflow - if (dataFlow == null) { - dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0]); - } - - // setup the cluster metadata - final ClusterMetadata clusterMetadata = new ClusterMetadata(); - clusterMetadata.setPrimaryNodeId(clusterDataFlow.getPrimaryNodeId()); - - // write to disk - writeDataFlow(file, dataFlow, clusterMetadata); - } - - private void writeTarEntry(final TarArchiveOutputStream tarOut, final String filename, final byte[] bytes) throws IOException { - final TarArchiveEntry flowEntry = new TarArchiveEntry(filename); - flowEntry.setSize(bytes.length); - tarOut.putArchiveEntry(flowEntry); - tarOut.write(bytes); - tarOut.closeArchiveEntry(); - } - - private void writeDataFlow(final File file, final DataFlow dataFlow, final ClusterMetadata clusterMetadata) throws IOException, JAXBException { - - try (final OutputStream fos = new FileOutputStream(file); - final TarArchiveOutputStream tarOut = new TarArchiveOutputStream(new BufferedOutputStream(fos))) { - - writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow()); - writeTarEntry(tarOut, TEMPLATES_FILENAME, dataFlow.getTemplates()); - writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets()); - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(256); - writeClusterMetadata(clusterMetadata, baos); - final byte[] clusterInfoBytes = baos.toByteArray(); - - writeTarEntry(tarOut, CLUSTER_INFO_FILENAME, clusterInfoBytes); - } - } - - private void writeClusterMetadata(final ClusterMetadata clusterMetadata, final OutputStream os) throws IOException, JAXBException { - // write cluster metadata to output stream - final Marshaller marshaller = ClusterMetadata.jaxbCtx.createMarshaller(); - marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); - marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true); - marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8"); - marshaller.marshal(clusterMetadata, os); - } - - @XmlRootElement(name = "clusterMetadata") - private static class ClusterMetadata { - - private NodeIdentifier primaryNodeId; - - private static final JAXBContext jaxbCtx; - - static { - try { - jaxbCtx = JAXBContext.newInstance(ClusterMetadata.class); - } catch (final JAXBException je) { - throw new RuntimeException(je); - } - } - - @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) - public NodeIdentifier getPrimaryNodeId() { - return primaryNodeId; - } - - public void setPrimaryNodeId(final NodeIdentifier primaryNodeId) { - this.primaryNodeId = primaryNodeId; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java deleted file mode 100644 index e135af3..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java +++ /dev/null @@ -1,356 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.flow.impl; - -import java.util.Collections; -import java.util.Date; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.nifi.cluster.flow.ClusterDataFlow; -import org.apache.nifi.cluster.flow.DaoException; -import org.apache.nifi.cluster.flow.DataFlowDao; -import org.apache.nifi.cluster.flow.DataFlowManagementService; -import org.apache.nifi.cluster.flow.PersistedFlowState; -import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.StandardDataFlow; -import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; -import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.logging.NiFiLog; -import org.apache.nifi.util.FormatUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements FlowManagementService interface. The service tries to keep the - * cluster's flow current with regards to the available nodes. - * - * The instance may be configured with a retrieval delay, which will reduce the - * number of retrievals performed by the service at the expense of increasing - * the chances that the service will not be able to provide a current flow to - * the caller. - * - * By default, the service will try to update the flow as quickly as possible. - * Configuring a delay enables a less aggressive retrieval strategy. - * Specifically, the eligible retrieval time is reset every time the flow state - * is set to STALE. If the state is set to UNKNOWN or CURRENT, then the flow - * will not be retrieved. - * - * @author unattributed - */ -public class DataFlowManagementServiceImpl implements DataFlowManagementService { - - /* - * Developer Note: - * - * This class maintains an ExecutorService and a Runnable. - * Although the class is not externally threadsafe, its internals are protected to - * accommodate multithread access between the ExecutorServer and the Runnable. - * - */ - private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowManagementServiceImpl.class)); - - private final DataFlowDao flowDao; - - private final ClusterManagerProtocolSender sender; - - private final Set<NodeIdentifier> nodeIds = new CopyOnWriteArraySet<>(); - - private final AtomicBoolean stopRequested = new AtomicBoolean(false); - - private final AtomicLong lastRetrievalTime = new AtomicLong(-1); - - private Timer flowRetriever; - - private long retrievableAfterTime = 0L; - - private AtomicInteger retrievalDelaySeconds = new AtomicInteger(0); - - private final TimingReentrantLock resourceLock = new TimingReentrantLock(new ReentrantLock()); - - public DataFlowManagementServiceImpl(final DataFlowDao flowDao, final ClusterManagerProtocolSender sender) { - if (flowDao == null) { - throw new IllegalArgumentException("Flow DAO may not be null."); - } else if (sender == null) { - throw new IllegalArgumentException("Cluster Manager Protocol Sender may not be null."); - } - this.flowDao = flowDao; - this.sender = sender; - } - - @Override - public void start() { - - if (isRunning()) { - throw new IllegalArgumentException("Instance is already running."); - } - - // reset stop requested - stopRequested.set(false); - - // setup flow retreiver timer - flowRetriever = new Timer("Flow Management Service", /* is daemon */ true); - flowRetriever.schedule(new FlowRetrieverTimerTask(), 0, 500); - } - - @Override - public boolean isRunning() { - return (flowRetriever != null); - } - - @Override - public void stop() { - - if (isRunning() == false) { - throw new IllegalArgumentException("Instance is already stopped."); - } - - // record stop request - stopRequested.set(true); - - flowRetriever.cancel(); - flowRetriever = null; - - } - - @Override - public ClusterDataFlow loadDataFlow() throws DaoException { - resourceLock.lock(); - try { - return flowDao.loadDataFlow(); - } finally { - resourceLock.unlock("loadDataFlow"); - } - } - - @Override - public void updatePrimaryNode(final NodeIdentifier nodeId) { - resourceLock.lock(); - try { - final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow(); - - final StandardDataFlow dataFlow; - if (existingClusterDataFlow == null) { - dataFlow = null; - } else { - dataFlow = existingClusterDataFlow.getDataFlow(); - } - - flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId)); - } finally { - resourceLock.unlock("updatePrimaryNode"); - } - } - - @Override - public PersistedFlowState getPersistedFlowState() { - resourceLock.lock(); - try { - return flowDao.getPersistedFlowState(); - } finally { - resourceLock.unlock("getPersistedFlowState"); - } - } - - @Override - public boolean isFlowCurrent() { - return PersistedFlowState.CURRENT == getPersistedFlowState(); - } - - @Override - public void setPersistedFlowState(final PersistedFlowState flowState) { - // lock to ensure state change and retrievable time update are atomic - resourceLock.lock(); - try { - flowDao.setPersistedFlowState(flowState); - if (PersistedFlowState.STALE == flowState) { - retrievableAfterTime = new Date().getTime() + (getRetrievalDelaySeconds() * 1000); - } else if (PersistedFlowState.UNKNOWN == flowState || PersistedFlowState.CURRENT == flowState) { - retrievableAfterTime = Long.MAX_VALUE; - } - } finally { - resourceLock.unlock("setPersistedFlowState"); - } - } - - @Override - public Set<NodeIdentifier> getNodeIds() { - return Collections.unmodifiableSet(nodeIds); - } - - @Override - public void setNodeIds(final Set<NodeIdentifier> nodeIds) { - - if (nodeIds == null) { - throw new IllegalArgumentException("Node IDs may not be null."); - } - - resourceLock.lock(); - try { - - if (this.nodeIds.equals(nodeIds)) { - return; - } - - this.nodeIds.clear(); - this.nodeIds.addAll(nodeIds); - - } finally { - resourceLock.unlock("setNodeIds"); - } - - } - - @Override - public int getRetrievalDelaySeconds() { - return retrievalDelaySeconds.get(); - } - - @Override - public void setRetrievalDelay(final String retrievalDelay) { - this.retrievalDelaySeconds.set((int) FormatUtils.getTimeDuration(retrievalDelay, TimeUnit.SECONDS)); - } - - public ClusterManagerProtocolSender getSender() { - return sender; - } - - public long getLastRetrievalTime() { - return lastRetrievalTime.get(); - } - - /** - * A timer task for issuing FlowRequestMessage messages to nodes to retrieve - * an updated flow. - */ - private class FlowRetrieverTimerTask extends TimerTask { - - @Override - public void run() { - - resourceLock.lock(); - try { - // if flow is current, then we're done - if (isFlowCurrent()) { - return; - } - } catch (final Exception ex) { - logger.info("Encountered exception checking if flow is current caused by " + ex, ex); - } finally { - resourceLock.unlock("FlowRetrieverTimerTask - isFlowCurrent"); - } - - final FlowRequestMessage request = new FlowRequestMessage(); - for (final NodeIdentifier nodeId : getNodeIds()) { - try { - // setup request - request.setNodeId(nodeId); - - // record request time - final long requestSentTime = new Date().getTime(); - - resourceLock.lock(); - try { - // sanity checks before making request - if (stopRequested.get()) { // did we receive a stop request - logger.debug("Stopping runnable prematurely because a request to stop was issued."); - return; - } else if (requestSentTime < retrievableAfterTime) { - /* - * Retrievable after time was updated while obtaining - * the lock, so try again later - */ - return; - } - } finally { - resourceLock.unlock("FlowRetrieverTimerTask - check stopRequested"); - } - - // send request - final FlowResponseMessage response = sender.requestFlow(request); - - resourceLock.lock(); - try { - // check if the retrieved flow is still valid - if (requestSentTime > retrievableAfterTime) { - logger.info("Saving retrieved flow."); - - final StandardDataFlow dataFlow = response.getDataFlow(); - final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow(); - final ClusterDataFlow currentClusterDataFlow; - if (existingClusterDataFlow == null) { - currentClusterDataFlow = new ClusterDataFlow(dataFlow, null); - } else { - currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId()); - } - flowDao.saveDataFlow(currentClusterDataFlow); - flowDao.setPersistedFlowState(PersistedFlowState.CURRENT); - lastRetrievalTime.set(new Date().getTime()); - } - - /* - * Retrievable after time was updated while requesting - * the flow, so try again later. - */ - } finally { - resourceLock.unlock("FlowRetrieverTimerTask - saveDataFlow"); - } - - } catch (final Throwable t) { - logger.info("Encountered exception retrieving flow from node " + nodeId + " caused by " + t, t); - } - } - } - } - - private static class TimingReentrantLock { - - private final Lock lock; - private static final Logger logger = LoggerFactory.getLogger("dataFlowManagementService.lock"); - - private final ThreadLocal<Long> lockTime = new ThreadLocal<>(); - - public TimingReentrantLock(final Lock lock) { - this.lock = lock; - } - - public void lock() { - lock.lock(); - lockTime.set(System.nanoTime()); - } - - public void unlock(final String task) { - final long nanosLocked = System.nanoTime() - lockTime.get(); - lock.unlock(); - - final long millisLocked = TimeUnit.MILLISECONDS.convert(nanosLocked, TimeUnit.NANOSECONDS); - if (millisLocked > 100L) { - logger.debug("Lock held for {} milliseconds for task: {}", millisLocked, task); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java deleted file mode 100644 index 3a1dfb2..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.manager; - -import java.util.List; -import java.util.Set; - -import org.apache.nifi.cluster.event.Event; -import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException; -import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException; -import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException; -import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException; -import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException; -import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException; -import org.apache.nifi.cluster.manager.exception.UnknownNodeException; -import org.apache.nifi.cluster.node.Node; -import org.apache.nifi.cluster.node.Node.Status; -import org.apache.nifi.cluster.protocol.ConnectionRequest; -import org.apache.nifi.cluster.protocol.ConnectionResponse; -import org.apache.nifi.cluster.protocol.Heartbeat; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.diagnostics.SystemDiagnostics; -import org.apache.nifi.remote.cluster.NodeInformant; -import org.apache.nifi.reporting.BulletinRepository; - -/** - * Defines the interface for a ClusterManager. The cluster manager is a - * threadsafe centralized manager for a cluster. Members of a cluster are nodes. - * A member becomes a node by issuing a connection request to the manager. The - * manager maintains the set of nodes. Nodes may be disconnected, reconnected, - * and deleted. - * - * Nodes are responsible for sending heartbeats to the manager to indicate their - * liveliness. A manager may disconnect a node if it does not receive a - * heartbeat within a configurable time period. A cluster manager instance may - * be configured with how often to monitor received heartbeats - * (getHeartbeatMonitoringIntervalSeconds()) and the maximum time that may - * elapse between node heartbeats before disconnecting the node - * (getMaxHeartbeatGapSeconds()). - * - * Since only a single node may execute isolated processors, the cluster manager - * maintains the notion of a primary node. The primary node is chosen at cluster - * startup and retains the role until a user requests a different node to be the - * primary node. - * - * @author unattributed - */ -public interface ClusterManager extends NodeInformant { - - /** - * Handles a node's heartbeat. - * - * @param heartbeat a heartbeat - * - */ - void handleHeartbeat(Heartbeat heartbeat); - - /** - * @param statuses the statuses of the nodes - * @return the set of nodes - */ - Set<Node> getNodes(Status... statuses); - - /** - * @param nodeId - * @return returns the node with the given identifier or null if node does - * not exist - */ - Node getNode(String nodeId); - - /** - * @param statuses - * @return the set of node identifiers with the given node status - */ - Set<NodeIdentifier> getNodeIds(Status... statuses); - - /** - * Deletes the node with the given node identifier. If the given node is the - * primary node, then a subsequent request may be made to the manager to set - * a new primary node. - * - * @param nodeId the node identifier - * @param userDn the Distinguished Name of the user requesting the node be - * deleted from the cluster - * - * @throws UnknownNodeException if the node does not exist - * @throws IllegalNodeDeletionException if the node is not in a disconnected - * state - */ - void deleteNode(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeDeletionException; - - /** - * Requests a connection to the cluster. - * - * @param request the request - * - * @return the response - */ - ConnectionResponse requestConnection(ConnectionRequest request); - - /** - * Services reconnection requests for a given node. If the node indicates - * reconnection failure, then the node will be set to disconnected. - * Otherwise, a reconnection request will be sent to the node, initiating - * the connection handshake. - * - * @param nodeId a node identifier - * @param userDn the Distinguished Name of the user requesting the - * reconnection - * - * @throws UnknownNodeException if the node does not exist - * @throws IllegalNodeReconnectionException if the node is not disconnected - */ - void requestReconnection(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeReconnectionException; - - /** - * Requests the node with the given identifier be disconnected. - * - * @param nodeId the node identifier - * @param userDn the Distinguished Name of the user requesting the - * disconnection - * - * @throws UnknownNodeException if the node does not exist - * @throws IllegalNodeDisconnectionException if the node cannot be - * disconnected due to the cluster's state (e.g., node is last connected - * node or node is primary) - * @throws UnknownNodeException if the node does not exist - * @throws IllegalNodeDisconnectionException if the node is not disconnected - * @throws NodeDisconnectionException if the disconnection failed - */ - void requestDisconnection(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeDisconnectionException, NodeDisconnectionException; - - /** - * @return the time in seconds to wait between successive executions of - * heartbeat monitoring - */ - int getHeartbeatMonitoringIntervalSeconds(); - - /** - * @return the maximum time in seconds that is allowed between successive - * heartbeats of a node before disconnecting the node - */ - int getMaxHeartbeatGapSeconds(); - - /** - * Returns a list of node events for the node with the given identifier. The - * events will be returned in order of most recent to least recent according - * to the creation date of the event. - * - * @param nodeId the node identifier - * - * @return the list of events or an empty list if no node exists with the - * given identifier - */ - List<Event> getNodeEvents(final String nodeId); - - /** - * Revokes the primary role from the current primary node and assigns the - * primary role to given given node ID. - * - * If role revocation fails, then the current primary node is set to - * disconnected while retaining the primary role and no role assignment is - * performed. - * - * If role assignment fails, then the given node is set to disconnected and - * is given the primary role. - * - * @param nodeId the node identifier - * @param userDn the Distinguished Name of the user requesting that the - * Primary Node be assigned - * - * @throws UnknownNodeException if the node with the given identifier does - * not exist - * @throws IneligiblePrimaryNodeException if the node with the given - * identifier is not eligible to be the primary node - * @throws PrimaryRoleAssignmentException if the cluster was unable to - * change the primary role to the requested node - */ - void setPrimaryNode(String nodeId, String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException; - - /** - * @return the primary node of the cluster or null if no primary node exists - */ - Node getPrimaryNode(); - - /** - * Returns the bulletin repository. - * - * @return - */ - BulletinRepository getBulletinRepository(); - - /** - * Returns a {@link ProcessGroupStatus} that represents the status of all - * nodes with the given {@link Status}es for the given ProcessGroup id, or - * null if no nodes exist with the given statuses - * - * @param groupId - * @return - */ - ProcessGroupStatus getProcessGroupStatus(String groupId); - - /** - * Returns a merged representation of the System Diagnostics for all nodes - * in the cluster - * - * @return - */ - SystemDiagnostics getSystemDiagnostics(); -}
