Repository: incubator-nifi Updated Branches: refs/heads/NIFI-250 fac6cd7ac -> 7de30ab15
NIFI-250: Deleted dead code Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d30a1843 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d30a1843 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d30a1843 Branch: refs/heads/NIFI-250 Commit: d30a1843c24caccf822811649332dad7e1aa5b17 Parents: 371e010 Author: Mark Payne <[email protected]> Authored: Wed Feb 11 16:08:20 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Wed Feb 11 16:08:20 2015 -0500 ---------------------------------------------------------------------- .../nifi/controller/StandardFlowService.java | 7 +- .../controller/StandardFlowSynchronizer.java | 2 +- .../nifi/persistence/FlowConfigurationDAO.java | 13 -- .../StandardXMLFlowConfigurationDAO.java | 180 +------------------ 4 files changed, 3 insertions(+), 199 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d30a1843/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 790485c..f62c842 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -81,8 +81,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { private final FlowController controller; private final Path flowXml; - private final Path taskConfigXml; - private final Path serviceConfigXml; private final FlowConfigurationDAO dao; private final int gracefulShutdownSeconds; private final boolean autoResumeState; @@ -154,14 +152,12 @@ public class StandardFlowService implements FlowService, ProtocolHandler { this.controller = controller; this.encryptor = encryptor; flowXml = Paths.get(properties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE)); - taskConfigXml = Paths.get(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE)); - serviceConfigXml = Paths.get(properties.getProperty(NiFiProperties.SERVICE_CONFIGURATION_FILE)); gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS); autoResumeState = properties.getAutoResumeState(); connectionRetryMillis = (int) FormatUtils.getTimeDuration(properties.getClusterManagerFlowRetrievalDelay(), TimeUnit.MILLISECONDS); - dao = new StandardXMLFlowConfigurationDAO(flowXml, taskConfigXml, serviceConfigXml, encryptor); + dao = new StandardXMLFlowConfigurationDAO(flowXml, encryptor); if (configuredForClustering) { @@ -608,7 +604,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { if (firstControllerInitialization) { // load the controller services logger.debug("Loading controller services"); -// dao.loadControllerServices(controller); } // load the flow http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d30a1843/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index efeabad..2508b5b 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -361,7 +361,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } if ( autoResumeState ) { - if ( dto.getEnabled() == Boolean.TRUE ) { + if ( Boolean.TRUE.equals(dto.getEnabled()) ) { try { controller.enableControllerService(node); } catch (final Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d30a1843/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java index 8cab916..8957314 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java @@ -19,13 +19,11 @@ package org.apache.nifi.persistence; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.List; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.FlowSerializationException; import org.apache.nifi.controller.FlowSynchronizationException; -import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.UninheritableFlowException; /** @@ -109,15 +107,4 @@ public interface FlowConfigurationDAO { */ void save(FlowController flow, boolean archive) throws IOException; - /** - * Instantiates and schedules all controller tasks from the file used in the - * constructor - * - * @param controller - * @return - * @throws java.io.IOException - * @returns all of the ReportingTasks that were instantiated & scheduled - */ - List<ReportingTaskNode> loadReportingTasks(FlowController controller) throws IOException; - } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d30a1843/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java index 3000869..b93ae8a 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java @@ -21,68 +21,36 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import javax.xml.XMLConstants; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; -import javax.xml.transform.dom.DOMSource; -import javax.xml.validation.Schema; -import javax.xml.validation.SchemaFactory; -import javax.xml.validation.Validator; - import org.apache.nifi.cluster.protocol.DataFlow; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.FlowSerializationException; import org.apache.nifi.controller.FlowSynchronizationException; import org.apache.nifi.controller.FlowSynchronizer; -import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.StandardFlowSerializer; import org.apache.nifi.controller.StandardFlowSynchronizer; import org.apache.nifi.controller.UninheritableFlowException; -import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; -import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; import org.apache.nifi.encrypt.StringEncryptor; -import org.apache.nifi.nar.NarCloseable; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.reporting.ReportingInitializationContext; -import org.apache.nifi.reporting.ReportingTask; -import org.apache.nifi.scheduling.SchedulingStrategy; -import org.apache.nifi.util.DomUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.file.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.w3c.dom.DOMException; -import org.w3c.dom.Document; -import org.w3c.dom.Element; -import org.w3c.dom.NodeList; -import org.xml.sax.SAXException; -import org.xml.sax.SAXParseException; public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationDAO { public static final String CONFIGURATION_ARCHIVE_DIR_KEY = "nifi.flow.configuration.archive.dir"; private final Path flowXmlPath; - private final Path taskConfigXmlPath; private final StringEncryptor encryptor; private static final Logger LOG = LoggerFactory.getLogger(StandardXMLFlowConfigurationDAO.class); - public StandardXMLFlowConfigurationDAO(final Path flowXml, final Path taskConfigXml, final Path serviceConfigXml, final StringEncryptor encryptor) throws IOException { + public StandardXMLFlowConfigurationDAO(final Path flowXml, final StringEncryptor encryptor) throws IOException { final File flowXmlFile = flowXml.toFile(); if (!flowXmlFile.exists()) { Files.createDirectories(flowXml.getParent()); @@ -92,13 +60,7 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD throw new IOException(flowXml + " exists but you have insufficient read/write privileges"); } - final File taskConfigXmlFile = Objects.requireNonNull(taskConfigXml).toFile(); - if ((!taskConfigXmlFile.exists() || !taskConfigXmlFile.canRead())) { - throw new IOException(taskConfigXml + " does not appear to exist or cannot be read. Cannot load configuration."); - } - this.flowXmlPath = flowXml; - this.taskConfigXmlPath = taskConfigXml; this.encryptor = encryptor; } @@ -193,144 +155,4 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD } } - @Override - public List<ReportingTaskNode> loadReportingTasks(final FlowController controller) { - final List<ReportingTaskNode> tasks = new ArrayList<>(); - if (taskConfigXmlPath == null) { - LOG.info("No reporting tasks to start"); - return tasks; - } - - try { - final URL schemaUrl = getClass().getResource("/ReportingTaskConfiguration.xsd"); - final Document document = parse(taskConfigXmlPath.toFile(), schemaUrl); - - final NodeList tasksNodes = document.getElementsByTagName("tasks"); - final Element tasksElement = (Element) tasksNodes.item(0); - - for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "task")) { - //add global properties common to all tasks - Map<String, String> properties = new HashMap<>(); - - //get properties for the specific reporting task - id, name, class, - //and schedulingPeriod must be set - final String taskName = DomUtils.getChild(taskElement, "name").getTextContent().trim(); - - final List<Element> schedulingStrategyNodeList = DomUtils.getChildElementsByTagName(taskElement, "schedulingStrategy"); - String schedulingStrategyValue = SchedulingStrategy.TIMER_DRIVEN.name(); - if (schedulingStrategyNodeList.size() == 1) { - final String specifiedValue = schedulingStrategyNodeList.get(0).getTextContent(); - - try { - schedulingStrategyValue = SchedulingStrategy.valueOf(specifiedValue).name(); - } catch (final Exception e) { - throw new RuntimeException("Cannot start Reporting Task with name " + taskName + " because its Scheduling Strategy does not have a valid value", e); - } - } - - final SchedulingStrategy schedulingStrategy = SchedulingStrategy.valueOf(schedulingStrategyValue); - final String taskSchedulingPeriod = DomUtils.getChild(taskElement, "schedulingPeriod").getTextContent().trim(); - final String taskClass = DomUtils.getChild(taskElement, "class").getTextContent().trim(); - - //optional task-specific properties - for (final Element optionalProperty : DomUtils.getChildElementsByTagName(taskElement, "property")) { - final String name = optionalProperty.getAttribute("name"); - final String value = optionalProperty.getTextContent().trim(); - properties.put(name, value); - } - - //set the class to be used for the configured reporting task - final ReportingTaskNode reportingTaskNode; - try { - reportingTaskNode = controller.createReportingTask(taskClass); - } catch (final ReportingTaskInstantiationException e) { - LOG.error("Unable to load reporting task {} due to {}", new Object[]{taskName, e}); - if (LOG.isDebugEnabled()) { - LOG.error("", e); - } - continue; - } - - reportingTaskNode.setName(taskName); - reportingTaskNode.setScheduldingPeriod(taskSchedulingPeriod); - reportingTaskNode.setSchedulingStrategy(schedulingStrategy); - - final ReportingTask reportingTask = reportingTaskNode.getReportingTask(); - - final ReportingInitializationContext config = new StandardReportingInitializationContext( - reportingTask.getIdentifier(), taskName, schedulingStrategy, taskSchedulingPeriod, controller); - reportingTask.initialize(config); - - final Map<PropertyDescriptor, String> resolvedProps; - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - resolvedProps = new HashMap<>(); - for (final Map.Entry<String, String> entry : properties.entrySet()) { - final PropertyDescriptor descriptor = reportingTask.getPropertyDescriptor(entry.getKey()); - resolvedProps.put(descriptor, entry.getValue()); - } - } - - for (final Map.Entry<PropertyDescriptor, String> entry : resolvedProps.entrySet()) { - reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue()); - } - - tasks.add(reportingTaskNode); - controller.startReportingTask(reportingTaskNode); - } - } catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) { - LOG.error("Unable to load reporting tasks from {} due to {}", new Object[]{taskConfigXmlPath, t}); - if (LOG.isDebugEnabled()) { - LOG.error("", t); - } - } - - return tasks; - } - - - private Document parse(final File xmlFile, final URL schemaUrl) throws SAXException, ParserConfigurationException, IOException { - final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); - final Schema schema = schemaFactory.newSchema(schemaUrl); - final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); - docFactory.setSchema(schema); - final DocumentBuilder builder = docFactory.newDocumentBuilder(); - - builder.setErrorHandler(new org.xml.sax.ErrorHandler() { - @Override - public void fatalError(final SAXParseException err) throws SAXException { - LOG.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage()); - if (LOG.isDebugEnabled()) { - LOG.error("Error Stack Dump", err); - } - throw err; - } - - @Override - public void error(final SAXParseException err) throws SAXParseException { - LOG.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage()); - if (LOG.isDebugEnabled()) { - LOG.error("Error Stack Dump", err); - } - throw err; - } - - @Override - public void warning(final SAXParseException err) throws SAXParseException { - LOG.warn(" Config file line " + err.getLineNumber() + ", uri " + err.getSystemId() + " : message : " + err.getMessage()); - if (LOG.isDebugEnabled()) { - LOG.warn("Warning stack dump", err); - } - throw err; - } - }); - - // build the docuemnt - final Document document = builder.parse(xmlFile); - - // ensure schema compliance - final Validator validator = schema.newValidator(); - validator.validate(new DOMSource(document)); - - return document; - } }
