http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index db9e68b..436215f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -16,38 +16,11 @@ */ package org.apache.nifi.controller; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.zip.GZIPInputStream; - -import javax.xml.XMLConstants; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; -import javax.xml.validation.Schema; -import javax.xml.validation.SchemaFactory; - import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer; import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.cluster.protocol.StandardDataFlow; import org.apache.nifi.components.PropertyDescriptor; @@ -88,11 +61,13 @@ import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.Severity; -import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.ExecutionNode; +import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.util.BundleUtils; import org.apache.nifi.util.DomUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.file.FileUtils; +import org.apache.nifi.web.api.dto.BundleDTO; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; @@ -114,6 +89,33 @@ import org.w3c.dom.Node; import org.w3c.dom.NodeList; import org.xml.sax.SAXException; +import javax.xml.XMLConstants; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; + /** */ public class StandardFlowSynchronizer implements FlowSynchronizer { @@ -147,7 +149,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { @Override public void sync(final FlowController controller, final DataFlow proposedFlow, final StringEncryptor encryptor) - throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException { + throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException, MissingBundleException { // handle corner cases involving no proposed flow if (proposedFlow == null) { @@ -158,15 +160,15 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } - // determine if the controller has been initialized - final boolean initialized = controller.isInitialized(); - logger.debug("Synching FlowController with proposed flow: Controller is Initialized = {}", initialized); + // determine if the controller already had flow sync'd to it + final boolean flowAlreadySynchronized = controller.isFlowSynchronized(); + logger.debug("Synching FlowController with proposed flow: Controller is Already Synchronized = {}", flowAlreadySynchronized); // serialize controller state to bytes final byte[] existingFlow; final boolean existingFlowEmpty; try { - if (initialized) { + if (flowAlreadySynchronized) { existingFlow = toBytes(controller); existingFlowEmpty = controller.getGroup(controller.getRootGroupId()).isEmpty() && controller.getAllReportingTasks().isEmpty() && controller.getAllControllerServices().isEmpty(); } else { @@ -232,11 +234,24 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { policyBasedAuthorizer = null; } - final DataFlow existingDataFlow = new StandardDataFlow(existingFlow, existingSnippets, existingAuthFingerprint); + final Set<String> missingComponents = new HashSet<>(); + controller.getAllControllerServices().stream().filter(cs -> cs.isExtensionMissing()).forEach(cs -> missingComponents.add(cs.getIdentifier())); + controller.getAllReportingTasks().stream().filter(r -> r.isExtensionMissing()).forEach(r -> missingComponents.add(r.getIdentifier())); + controller.getRootGroup().findAllProcessors().stream().filter(p -> p.isExtensionMissing()).forEach(p -> missingComponents.add(p.getIdentifier())); + + final DataFlow existingDataFlow = new StandardDataFlow(existingFlow, existingSnippets, existingAuthFingerprint, missingComponents); + + Document configuration = null; // check that the proposed flow is inheritable by the controller try { - if (!existingFlowEmpty) { + if (existingFlowEmpty) { + configuration = parseFlowBytes(proposedFlow.getFlow()); + if (configuration != null) { + logger.trace("Checking bunde compatibility"); + checkBundleCompatibility(configuration); + } + } else { logger.trace("Checking flow inheritability"); final String problemInheritingFlow = checkFlowInheritability(existingDataFlow, proposedFlow, controller); if (problemInheritingFlow != null) { @@ -247,6 +262,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { throw new FlowSerializationException("Failed to generate flow fingerprints", fe); } + logger.trace("Checking missing component inheritability"); + + final String problemInheritingMissingComponents = checkMissingComponentsInheritability(existingDataFlow, proposedFlow); + if (problemInheritingMissingComponents != null) { + throw new UninheritableFlowException("Proposed Flow is not inheritable by the flow controller because of differences in missing components: " + problemInheritingMissingComponents); + } + logger.trace("Checking authorizer inheritability"); final AuthorizerInheritability authInheritability = checkAuthorizerInheritability(existingDataFlow, proposedFlow); @@ -256,7 +278,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // create document by parsing proposed flow bytes logger.trace("Parsing proposed flow bytes as DOM document"); - final Document configuration = parseFlowBytes(proposedFlow.getFlow()); + if (configuration == null) { + configuration = parseFlowBytes(proposedFlow.getFlow()); + } // attempt to sync controller with proposed flow try { @@ -282,7 +306,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // if this controller isn't initialized or its empty, add the root group, otherwise update final ProcessGroup rootGroup; - if (!initialized || existingFlowEmpty) { + if (!flowAlreadySynchronized || existingFlowEmpty) { logger.trace("Adding root process group"); rootGroup = addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion); } else { @@ -315,7 +339,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final Map<ReportingTaskNode, ReportingTaskDTO> reportingTaskNodesToDTOs = new HashMap<>(); for (final Element taskElement : reportingTaskElements) { final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(taskElement, encryptor); - final ReportingTaskNode reportingTask = getOrCreateReportingTask(controller, dto, initialized, existingFlowEmpty); + final ReportingTaskNode reportingTask = getOrCreateReportingTask(controller, dto, flowAlreadySynchronized, existingFlowEmpty); reportingTaskNodesToDTOs.put(reportingTask, dto); } @@ -323,7 +347,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { if (controllerServicesElement != null) { final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); - if (!initialized || existingFlowEmpty) { + if (!flowAlreadySynchronized || existingFlowEmpty) { // If the encoding version is null, we are loading a flow from NiFi 0.x, where Controller // Services could not be scoped by Process Group. As a result, we want to move the Process Groups // to the root Group. Otherwise, we want to use a null group, which indicates a Controller-level @@ -370,7 +394,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // now that controller services are loaded and enabled we can apply the scheduled state to each reporting task for (Map.Entry<ReportingTaskNode, ReportingTaskDTO> entry : reportingTaskNodesToDTOs.entrySet()) { - applyReportingTaskScheduleState(controller, entry.getValue(), entry.getKey(), initialized, existingFlowEmpty); + applyReportingTaskScheduleState(controller, entry.getValue(), entry.getKey(), flowAlreadySynchronized, existingFlowEmpty); } } } @@ -402,6 +426,42 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } + private void checkBundleCompatibility(final Document configuration) { + final NodeList bundleNodes = configuration.getElementsByTagName("bundle"); + for (int i = 0; i < bundleNodes.getLength(); i++) { + final Node bundleNode = bundleNodes.item(i); + if (bundleNode instanceof Element) { + final Element bundleElement = (Element) bundleNode; + + final Node componentNode = bundleElement.getParentNode(); + if (componentNode instanceof Element) { + final Element componentElement = (Element) componentNode; + if (!withinTemplate(componentElement)) { + final String componentType = DomUtils.getChildText(componentElement, "class"); + try { + BundleUtils.getBundle(componentType, FlowFromDOMFactory.getBundle(bundleElement)); + } catch (IllegalStateException e) { + throw new MissingBundleException(e.getMessage(), e); + } + } + } + } + } + } + + private boolean withinTemplate(final Element element) { + if ("template".equals(element.getTagName())) { + return true; + } else { + final Node parentNode = element.getParentNode(); + if (parentNode instanceof Element) { + return withinTemplate((Element) parentNode); + } else { + return false; + } + } + } + private void updateReportingTaskControllerServices(final Set<ReportingTaskNode> reportingTasks, final Map<String, ControllerServiceNode> controllerServiceMapping) { for (ReportingTaskNode reportingTask : reportingTasks) { if (reportingTask.getProperties() != null) { @@ -513,7 +573,19 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { throws ReportingTaskInstantiationException { // create a new reporting task node when the controller is not initialized or the flow is empty if (!controllerInitialized || existingFlowEmpty) { - final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false); + BundleCoordinate coordinate; + try { + coordinate = BundleUtils.getCompatibleBundle(dto.getType(), dto.getBundle()); + } catch (final IllegalStateException e) { + final BundleDTO bundleDTO = dto.getBundle(); + if (bundleDTO == null) { + coordinate = BundleCoordinate.UNKNOWN_COORDINATE; + } else { + coordinate = new BundleCoordinate(bundleDTO.getGroup(), bundleDTO.getArtifact(), bundleDTO.getVersion()); + } + } + + final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), coordinate, false); reportingTask.setName(dto.getName()); reportingTask.setComments(dto.getComments()); reportingTask.setSchedulingPeriod(dto.getSchedulingPeriod()); @@ -964,7 +1036,20 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final List<Element> processorNodeList = getChildrenByTagName(processGroupElement, "processor"); for (final Element processorElement : processorNodeList) { final ProcessorDTO processorDTO = FlowFromDOMFactory.getProcessor(processorElement, encryptor); - final ProcessorNode procNode = controller.createProcessor(processorDTO.getType(), processorDTO.getId(), false); + + BundleCoordinate coordinate; + try { + coordinate = BundleUtils.getCompatibleBundle(processorDTO.getType(), processorDTO.getBundle()); + } catch (final IllegalStateException e) { + final BundleDTO bundleDTO = processorDTO.getBundle(); + if (bundleDTO == null) { + coordinate = BundleCoordinate.UNKNOWN_COORDINATE; + } else { + coordinate = new BundleCoordinate(bundleDTO.getGroup(), bundleDTO.getArtifact(), bundleDTO.getVersion()); + } + } + + final ProcessorNode procNode = controller.createProcessor(processorDTO.getType(), processorDTO.getId(), coordinate, false); processGroup.addProcessor(procNode); updateProcessor(procNode, processorDTO, processGroup, controller); } @@ -1257,6 +1342,30 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { return processGroup; } + public String checkMissingComponentsInheritability(final DataFlow existingFlow, final DataFlow proposedFlow) { + if (existingFlow == null) { + return null; // no existing flow, so equivalent to proposed flow + } + + final Set<String> existingMissingComponents = new HashSet<>(existingFlow.getMissingComponents()); + existingMissingComponents.removeAll(proposedFlow.getMissingComponents()); + + if (existingMissingComponents.size() > 0) { + final String missingIds = StringUtils.join(existingMissingComponents, ","); + return "Current flow has missing components that are not considered missing in the proposed flow (" + missingIds + ")"; + } + + final Set<String> proposedMissingComponents = new HashSet<>(proposedFlow.getMissingComponents()); + proposedMissingComponents.removeAll(existingFlow.getMissingComponents()); + + if (proposedMissingComponents.size() > 0) { + final String missingIds = StringUtils.join(proposedMissingComponents, ","); + return "Proposed flow has missing components that are not considered missing in the current flow (" + missingIds + ")"; + } + + return null; + } + /** * If both authorizers are external authorizers, or if the both are internal * authorizers with equal fingerprints, then an uniheritable result with no @@ -1353,6 +1462,11 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { return "Proposed Flow was empty but Current Flow is not"; // existing flow is not empty and proposed flow is empty (we could orphan flowfiles) } + if (logger.isTraceEnabled()) { + logger.trace("Local Fingerprint Before Hash = {}", new Object[] {existingFlowFingerprintBeforeHash}); + logger.trace("Proposed Fingerprint Before Hash = {}", new Object[] {proposedFlowFingerprintBeforeHash}); + } + final boolean inheritable = existingFlowFingerprintBeforeHash.equals(proposedFlowFingerprintBeforeHash); if (!inheritable) { return findFirstDiscrepancy(existingFlowFingerprintBeforeHash, proposedFlowFingerprintBeforeHash, "Flows");
http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 3d2c222..afc94d8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -18,13 +18,9 @@ package org.apache.nifi.controller; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.configuration.DefaultSchedule; @@ -36,6 +32,8 @@ import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.resource.ResourceType; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.connectable.Connectable; @@ -107,7 +105,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable public static final String DEFAULT_YIELD_PERIOD = "1 sec"; public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec"; private final AtomicReference<ProcessGroup> processGroup; - private final Processor processor; + private final AtomicReference<ProcessorDetails> processorRef; private final AtomicReference<String> identifier; private final Map<Connection, Connectable> destinations; private final Map<Relationship, Set<Connection>> connections; @@ -123,13 +121,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private final AtomicInteger concurrentTaskCount; private final AtomicLong yieldExpiration; private final AtomicLong schedulingNanos; - private final boolean triggerWhenEmpty; - private final boolean sideEffectFree; - private final boolean triggeredSerially; - private final boolean triggerWhenAnyDestinationAvailable; - private final boolean eventDrivenSupported; - private final boolean batchSupported; - private final Requirement inputRequirement; private final ProcessScheduler processScheduler; private long runNanos = 0L; private final NiFiProperties nifiProperties; @@ -138,23 +129,26 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable // ??????? NOT any more private ExecutionNode executionNode; - public StandardProcessorNode(final Processor processor, final String uuid, + public StandardProcessorNode(final LoggableComponent<Processor> processor, final String uuid, final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider, final NiFiProperties nifiProperties, - final VariableRegistry variableRegistry, final ComponentLog logger) { + final VariableRegistry variableRegistry) { this(processor, uuid, validationContextFactory, scheduler, controllerServiceProvider, - processor.getClass().getSimpleName(), processor.getClass().getCanonicalName(), nifiProperties, variableRegistry, logger); + processor.getComponent().getClass().getSimpleName(), processor.getComponent().getClass().getCanonicalName(), nifiProperties, variableRegistry, false); } - public StandardProcessorNode(final Processor processor, final String uuid, + public StandardProcessorNode(final LoggableComponent<Processor> processor, final String uuid, final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider, final String componentType, final String componentCanonicalClass, final NiFiProperties nifiProperties, - final VariableRegistry variableRegistry, final ComponentLog logger) { + final VariableRegistry variableRegistry, final boolean isExtensionMissing) { + + super(uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, isExtensionMissing); + + final ProcessorDetails processorDetails = new ProcessorDetails(processor); + this.processorRef = new AtomicReference<>(processorDetails); - super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, logger); - this.processor = processor; identifier = new AtomicReference<>(uuid); destinations = new HashMap<>(); connections = new HashMap<>(); @@ -175,26 +169,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD); this.nifiProperties = nifiProperties; - final Class<?> procClass = processor.getClass(); - triggerWhenEmpty = procClass.isAnnotationPresent(TriggerWhenEmpty.class); - sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class); - batchSupported = procClass.isAnnotationPresent(SupportsBatching.class); - triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class); - triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class); - eventDrivenSupported = procClass.isAnnotationPresent(EventDriven.class) && !triggeredSerially && !triggerWhenEmpty; - - final boolean inputRequirementPresent = procClass.isAnnotationPresent(InputRequirement.class); - if (inputRequirementPresent) { - inputRequirement = procClass.getAnnotation(InputRequirement.class).value(); - } else { - inputRequirement = Requirement.INPUT_ALLOWED; - } - schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; executionNode = ExecutionNode.ALL; try { - if (procClass.isAnnotationPresent(DefaultSchedule.class)) { - DefaultSchedule dsc = procClass.getAnnotation(DefaultSchedule.class); + if (processorDetails.getProcClass().isAnnotationPresent(DefaultSchedule.class)) { + DefaultSchedule dsc = processorDetails.getProcClass().getAnnotation(DefaultSchedule.class); try { this.setSchedulingStrategy(dsc.strategy()); } catch (Throwable ex) { @@ -206,7 +185,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable this.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN); LOG.error(String.format("Error while setting scheduling period from DefaultSchedule annotation: %s", ex.getMessage()), ex); } - if (!triggeredSerially) { + if (!processorDetails.isTriggeredSerially()) { try { setMaxConcurrentTasks(dsc.concurrentTasks()); } catch (Throwable ex) { @@ -219,6 +198,21 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } } + @Override + public ConfigurableComponent getComponent() { + return processorRef.get().getProcessor(); + } + + @Override + public ComponentLog getLogger() { + return processorRef.get().getComponentLog(); + } + + @Override + public BundleCoordinate getBundleCoordinate() { + return processorRef.get().getBundleCoordinate(); + } + /** * @return comments about this specific processor instance */ @@ -250,7 +244,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * new comments */ @Override - public void setComments(final String comments) { + public synchronized void setComments(final String comments) { if (isRunning()) { throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } @@ -263,7 +257,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public void setPosition(final Position position) { + public synchronized void setPosition(final Position position) { this.position.set(position); } @@ -273,7 +267,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public void setStyle(final Map<String, String> style) { + public synchronized void setStyle(final Map<String, String> style) { if (style != null) { this.style.set(Collections.unmodifiableMap(new HashMap<>(style))); } @@ -304,7 +298,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable */ @Override public boolean isTriggerWhenEmpty() { - return triggerWhenEmpty; + return processorRef.get().isTriggerWhenEmpty(); } /** @@ -313,12 +307,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable */ @Override public boolean isSideEffectFree() { - return sideEffectFree; + return processorRef.get().isSideEffectFree(); } @Override public boolean isHighThroughputSupported() { - return batchSupported; + return processorRef.get().isBatchSupported(); } /** @@ -328,7 +322,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable */ @Override public boolean isTriggerWhenAnyDestinationAvailable() { - return triggerWhenAnyDestinationAvailable; + return processorRef.get().isTriggerWhenAnyDestinationAvailable(); } /** @@ -339,7 +333,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * tolerant */ @Override - public void setLossTolerant(final boolean lossTolerant) { + public synchronized void setLossTolerant(final boolean lossTolerant) { if (isRunning()) { throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } @@ -389,6 +383,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * annotation, if one exists, else <code>null</code>. */ public String getProcessorDescription() { + final Processor processor = processorRef.get().getProcessor(); final CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class); String description = null; if (capDesc != null) { @@ -398,7 +393,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public void setName(final String name) { + public synchronized void setName(final String name) { if (isRunning()) { throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } @@ -420,7 +415,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public boolean isEventDrivenSupported() { - return this.eventDrivenSupported; + return processorRef.get().isEventDrivenSupported(); } /** @@ -434,8 +429,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * Processor */ @Override - public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) { - if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && !eventDrivenSupported) { + public synchronized void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) { + if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && !processorRef.get().isEventDrivenSupported()) { // not valid. Just ignore it. We don't throw an Exception because if // a developer changes a Processor so that // it no longer supports EventDriven mode, we don't want the app to @@ -461,7 +456,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public void setScheduldingPeriod(final String schedulingPeriod) { + public synchronized void setScheduldingPeriod(final String schedulingPeriod) { if (isRunning()) { throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } @@ -495,7 +490,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public void setExecutionNode(final ExecutionNode executionNode) { + public synchronized void setExecutionNode(final ExecutionNode executionNode) { this.executionNode = executionNode; } @@ -510,7 +505,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public void setRunDuration(final long duration, final TimeUnit timeUnit) { + public synchronized void setRunDuration(final long duration, final TimeUnit timeUnit) { if (duration < 0) { throw new IllegalArgumentException("Run Duration must be non-negative value; cannot set to " + timeUnit.toSeconds(duration) + " seconds"); @@ -530,7 +525,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public void setYieldPeriod(final String yieldPeriod) { + public synchronized void setYieldPeriod(final String yieldPeriod) { if (isRunning()) { throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } @@ -549,6 +544,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable */ @Override public void yield() { + final Processor processor = processorRef.get().getProcessor(); final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS); yield(yieldMillis, TimeUnit.MILLISECONDS); @@ -587,7 +583,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public void setPenalizationPeriod(final String penalizationPeriod) { + public synchronized void setPenalizationPeriod(final String penalizationPeriod) { if (isRunning()) { throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } @@ -609,21 +605,21 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * if the given value is less than 1 */ @Override - public void setMaxConcurrentTasks(final int taskCount) { + public synchronized void setMaxConcurrentTasks(final int taskCount) { if (isRunning()) { throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } if (taskCount < 1 && getSchedulingStrategy() != SchedulingStrategy.EVENT_DRIVEN) { throw new IllegalArgumentException(); } - if (!triggeredSerially) { + if (!isTriggeredSerially()) { concurrentTaskCount.set(taskCount); } } @Override public boolean isTriggeredSerially() { - return triggeredSerially; + return processorRef.get().isTriggeredSerially(); } /** @@ -641,7 +637,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public void setBulletinLevel(final LogLevel level) { + public synchronized void setBulletinLevel(final LogLevel level) { LogRepositoryFactory.getRepository(getIdentifier()).setObservationLevel(BULLETIN_OBSERVER_ID, level); } @@ -842,6 +838,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable Relationship returnRel = specRel; final Set<Relationship> relationships; + final Processor processor = processorRef.get().getProcessor(); try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { relationships = processor.getRelationships(); } @@ -857,7 +854,17 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public Processor getProcessor() { - return this.processor; + return processorRef.get().getProcessor(); + } + + @Override + public synchronized void setProcessor(final LoggableComponent<Processor> processor) { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + + final ProcessorDetails processorDetails = new ProcessorDetails(processor); + processorRef.set(processorDetails); } /** @@ -888,6 +895,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable public Set<Relationship> getUndefinedRelationships() { final Set<Relationship> undefined = new HashSet<>(); final Set<Relationship> relationships; + final Processor processor = processorRef.get().getProcessor(); try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { relationships = processor.getRelationships(); } @@ -943,10 +951,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable final ValidationContext validationContext = this.getValidationContextFactory() .newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()); - final Collection<ValidationResult> validationResults; - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) { - validationResults = getProcessor().validate(validationContext); - } + final Collection<ValidationResult> validationResults = super.validate(validationContext); for (final ValidationResult result : validationResults) { if (!result.isValid()) { @@ -993,10 +998,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable final ValidationContext validationContext = this.getValidationContextFactory() .newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier()); - final Collection<ValidationResult> validationResults; - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) { - validationResults = getProcessor().validate(validationContext); - } + final Collection<ValidationResult> validationResults = super.validate(validationContext); for (final ValidationResult result : validationResults) { if (!result.isValid()) { @@ -1045,7 +1047,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public Requirement getInputRequirement() { - return inputRequirement; + return processorRef.get().getInputRequirement(); } /** @@ -1071,14 +1073,16 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public Collection<Relationship> getRelationships() { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) { + final Processor processor = processorRef.get().getProcessor(); + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { return getProcessor().getRelationships(); } } @Override public String toString() { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) { + final Processor processor = processorRef.get().getProcessor(); + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { return getProcessor().toString(); } } @@ -1089,12 +1093,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public void setProcessGroup(final ProcessGroup group) { + public synchronized void setProcessGroup(final ProcessGroup group) { this.processGroup.set(group); } @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) { + final Processor processor = processorRef.get().getProcessor(); try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { processor.onTrigger(context, sessionFactory); } @@ -1266,8 +1271,15 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable if (!this.isValid()) { throw new IllegalStateException( "Processor " + this.getName() + " is not in a valid state due to " + this.getValidationErrors()); } + final Processor processor = processorRef.get().getProcessor(); final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); - if (this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING)) { // will ensure that the Processor represented by this node can only be started once + + final boolean starting; + synchronized (this) { + starting = this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING); + } + + if (starting) { // will ensure that the Processor represented by this node can only be started once final Runnable startProcRunnable = new Runnable() { @Override public void run() { @@ -1293,7 +1305,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } catch (final Exception e) { final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {} seconds", - new Object[] {StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis / 1000L}, cause); + new Object[]{StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis / 1000L}, cause); LOG.error("Failed to invoke @OnScheduled method due to {}", cause.toString(), cause); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); @@ -1309,7 +1321,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable }; taskScheduler.execute(startProcRunnable); } else { - final String procName = this.processor.getClass().getSimpleName(); + final String procName = processorRef.getClass().getSimpleName(); LOG.warn("Can not start '" + procName + "' since it's already in the process of being started or it is DISABLED - " + scheduledState.get()); @@ -1348,7 +1360,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public <T extends ProcessContext & ControllerServiceLookup> void stop(final ScheduledExecutorService scheduler, final T processContext, final SchedulingAgent schedulingAgent, final ScheduleState scheduleState) { - LOG.info("Stopping processor: " + this.processor.getClass()); + + final Processor processor = processorRef.get().getProcessor(); + LOG.info("Stopping processor: " + processor.getClass()); + + if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once scheduleState.incrementActiveThreadCount(); @@ -1385,14 +1401,14 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } }); } else { - /* - * We do compareAndSet() instead of set() to ensure that Processor - * stoppage is handled consistently including a condition where - * Processor never got a chance to transition to RUNNING state - * before stop() was called. If that happens the stop processor - * routine will be initiated in start() method, otherwise the IF - * part will handle the stop processor routine. - */ + /* + * We do compareAndSet() instead of set() to ensure that Processor + * stoppage is handled consistently including a condition where + * Processor never got a chance to transition to RUNNING state + * before stop() was called. If that happens the stop processor + * routine will be initiated in start() method, otherwise the IF + * part will handle the stop processor routine. + */ this.scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.STOPPING); } } @@ -1424,6 +1440,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * </p> */ private <T> void invokeTaskAsCancelableFuture(final SchedulingAgentCallback callback, final Callable<T> task) { + final Processor processor = processorRef.get().getProcessor(); final String timeoutString = nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT); final long onScheduleTimeout = timeoutString == null ? 60000 : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS); @@ -1431,19 +1448,19 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable try { taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS); } catch (final InterruptedException e) { - LOG.warn("Thread was interrupted while waiting for processor '" + this.processor.getClass().getSimpleName() + LOG.warn("Thread was interrupted while waiting for processor '" + processor.getClass().getSimpleName() + "' lifecycle OnScheduled operation to finish."); Thread.currentThread().interrupt(); throw new RuntimeException("Interrupted while executing one of processor's OnScheduled tasks.", e); } catch (final TimeoutException e) { taskFuture.cancel(true); LOG.warn("Timed out while waiting for OnScheduled of '" - + this.processor.getClass().getSimpleName() + + processor.getClass().getSimpleName() + "' processor to finish. An attempt is made to cancel the task via Thread.interrupt(). However it does not " + "guarantee that the task will be canceled since the code inside current OnScheduled operation may " + "have been written to ignore interrupts which may result in a runaway thread. This could lead to more issues, " + "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '" - + this.processor + "' that needs to be documented, reported and eventually fixed."); + + processor + "' that needs to be documented, reported and eventually fixed."); throw new RuntimeException("Timed out while executing one of processor's OnScheduled task.", e); } catch (final ExecutionException e){ throw new RuntimeException("Failed while executing one of processor's OnScheduled task.", e); @@ -1457,4 +1474,5 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable final ProcessGroup group = getProcessGroup(); return group == null ? null : group.getIdentifier(); } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java index 24abd5f..ff7d61f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java @@ -17,20 +17,6 @@ package org.apache.nifi.controller; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.xml.bind.JAXBContext; -import javax.xml.bind.Unmarshaller; -import javax.xml.transform.dom.DOMSource; - import org.apache.nifi.persistence.TemplateDeserializer; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.web.api.dto.ConnectableDTO; @@ -46,6 +32,19 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.TemplateDTO; import org.w3c.dom.Element; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.Unmarshaller; +import javax.xml.transform.dom.DOMSource; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + public class TemplateUtils { public static TemplateDTO parseDto(final Element templateElement) { @@ -199,6 +198,8 @@ public class TemplateUtils { } } + processorDTO.setExtensionMissing(null); + processorDTO.setMultipleVersionsAvailable(null); processorDTO.setValidationErrors(null); processorDTO.setInputRequirement(null); processorDTO.setDescription(null); @@ -229,6 +230,7 @@ public class TemplateUtils { descriptor.setRequired(null); descriptor.setSensitive(null); descriptor.setSupportsEl(null); + descriptor.setIdentifiesControllerServiceBundle(null); } private static void scrubControllerServices(final Set<ControllerServiceDTO> controllerServices) { @@ -246,6 +248,11 @@ public class TemplateUtils { } } + serviceDTO.setControllerServiceApis(null); + + serviceDTO.setExtensionMissing(null); + serviceDTO.setMultipleVersionsAvailable(null); + serviceDTO.setCustomUiUrl(null); serviceDTO.setValidationErrors(null); } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index d7ae309..aeb1d07 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -17,10 +17,13 @@ package org.apache.nifi.controller.reporting; import org.apache.nifi.annotation.configuration.DefaultSchedule; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractConfiguredComponent; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.LoggableComponent; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; @@ -49,7 +52,7 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon private static final Logger LOG = LoggerFactory.getLogger(AbstractReportingTaskNode.class); - private final ReportingTask reportingTask; + private final AtomicReference<ReportingTaskDetails> reportingTaskRef; private final ProcessScheduler processScheduler; private final ControllerServiceLookup serviceLookup; @@ -59,28 +62,26 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon private volatile String comment; private volatile ScheduledState scheduledState = ScheduledState.STOPPED; - public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id, + public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, - final ValidationContextFactory validationContextFactory, final VariableRegistry variableRegistry, - final ComponentLog logger) { + final ValidationContextFactory validationContextFactory, final VariableRegistry variableRegistry) { this(reportingTask, id, controllerServiceProvider, processScheduler, validationContextFactory, - reportingTask.getClass().getSimpleName(), reportingTask.getClass().getCanonicalName(),variableRegistry, logger); + reportingTask.getComponent().getClass().getSimpleName(), reportingTask.getComponent().getClass().getCanonicalName(),variableRegistry, false); } - public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id, - final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, - final ValidationContextFactory validationContextFactory, + public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final ControllerServiceProvider controllerServiceProvider, + final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry, - final ComponentLog logger) { + final boolean isExtensionMissing) { - super(reportingTask, id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, logger); - this.reportingTask = reportingTask; + super(id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, isExtensionMissing); + this.reportingTaskRef = new AtomicReference<>(new ReportingTaskDetails(reportingTask)); this.processScheduler = processScheduler; this.serviceLookup = controllerServiceProvider; - final Class<?> reportingClass = reportingTask.getClass(); + final Class<?> reportingClass = reportingTask.getComponent().getClass(); DefaultSchedule dsc = AnnotationUtils.findAnnotation(reportingClass, DefaultSchedule.class); if(dsc != null) { @@ -99,6 +100,21 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon } @Override + public ConfigurableComponent getComponent() { + return reportingTaskRef.get().getReportingTask(); + } + + @Override + public BundleCoordinate getBundleCoordinate() { + return reportingTaskRef.get().getBundleCoordinate(); + } + + @Override + public ComponentLog getLogger() { + return reportingTaskRef.get().getComponentLog(); + } + + @Override public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) { this.schedulingStrategy.set(schedulingStrategy); } @@ -125,7 +141,15 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon @Override public ReportingTask getReportingTask() { - return reportingTask; + return reportingTaskRef.get().getReportingTask(); + } + + @Override + public void setReportingTask(final LoggableComponent<ReportingTask> reportingTask) { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Reporting Task configuration while Reporting Task is running"); + } + this.reportingTaskRef.set(new ReportingTaskDetails(reportingTask)); } @Override @@ -177,50 +201,50 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon @Override public void verifyCanDelete() { if (isRunning()) { - throw new IllegalStateException("Cannot delete " + reportingTask.getIdentifier() + " because it is currently running"); + throw new IllegalStateException("Cannot delete " + getReportingTask().getIdentifier() + " because it is currently running"); } } @Override public void verifyCanDisable() { if (isRunning()) { - throw new IllegalStateException("Cannot disable " + reportingTask.getIdentifier() + " because it is currently running"); + throw new IllegalStateException("Cannot disable " + getReportingTask().getIdentifier() + " because it is currently running"); } if (isDisabled()) { - throw new IllegalStateException("Cannot disable " + reportingTask.getIdentifier() + " because it is already disabled"); + throw new IllegalStateException("Cannot disable " + getReportingTask().getIdentifier() + " because it is already disabled"); } } @Override public void verifyCanEnable() { if (!isDisabled()) { - throw new IllegalStateException("Cannot enable " + reportingTask.getIdentifier() + " because it is not disabled"); + throw new IllegalStateException("Cannot enable " + getReportingTask().getIdentifier() + " because it is not disabled"); } } @Override public void verifyCanStart() { if (isDisabled()) { - throw new IllegalStateException("Cannot start " + reportingTask.getIdentifier() + " because it is currently disabled"); + throw new IllegalStateException("Cannot start " + getReportingTask().getIdentifier() + " because it is currently disabled"); } if (isRunning()) { - throw new IllegalStateException("Cannot start " + reportingTask.getIdentifier() + " because it is already running"); + throw new IllegalStateException("Cannot start " + getReportingTask().getIdentifier() + " because it is already running"); } } @Override public void verifyCanStop() { if (!isRunning()) { - throw new IllegalStateException("Cannot stop " + reportingTask.getIdentifier() + " because it is not running"); + throw new IllegalStateException("Cannot stop " + getReportingTask().getIdentifier() + " because it is not running"); } } @Override public void verifyCanUpdate() { if (isRunning()) { - throw new IllegalStateException("Cannot update " + reportingTask.getIdentifier() + " because it is currently running"); + throw new IllegalStateException("Cannot update " + getReportingTask().getIdentifier() + " because it is currently running"); } } @@ -275,4 +299,5 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon } return results != null ? results : Collections.emptySet(); } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskDetails.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskDetails.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskDetails.java new file mode 100644 index 0000000..3561c07 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskDetails.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.reporting; + +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.controller.LoggableComponent; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.ReportingTask; + +/** + * Holder for StandardReportingTaskNode to atomically swap out the component. + */ +class ReportingTaskDetails { + + private final ReportingTask reportingTask; + private final ComponentLog componentLog; + private final BundleCoordinate bundleCoordinate; + + public ReportingTaskDetails(final LoggableComponent<ReportingTask> reportingTask) { + this.reportingTask = reportingTask.getComponent(); + this.componentLog = reportingTask.getLogger(); + this.bundleCoordinate = reportingTask.getBundleCoordinate(); + } + + public ReportingTask getReportingTask() { + return reportingTask; + } + + public ComponentLog getComponentLog() { + return componentLog; + } + + public BundleCoordinate getBundleCoordinate() { + return bundleCoordinate; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java index adb2240..edf1b67 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java @@ -22,10 +22,10 @@ import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.resource.ResourceType; import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.LoggableComponent; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ValidationContextFactory; -import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingTask; @@ -34,17 +34,18 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme private final FlowController flowController; - public StandardReportingTaskNode(final ReportingTask reportingTask, final String id, final FlowController controller, + public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, - final VariableRegistry variableRegistry, final ComponentLog logger) { - super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry, logger); + final VariableRegistry variableRegistry) { + super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry); this.flowController = controller; } - public StandardReportingTaskNode(final ReportingTask reportingTask, final String id, final FlowController controller, - final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, - final String componentType, final String canonicalClassName, final VariableRegistry variableRegistry, final ComponentLog logger) { - super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName,variableRegistry, logger); + public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller, + final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, + final String componentType, final String canonicalClassName, final VariableRegistry variableRegistry, + final boolean isExtensionMissing) { + super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName,variableRegistry, isExtensionMissing); this.flowController = controller; } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index f8d38bc..731d914 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -16,24 +16,16 @@ */ package org.apache.nifi.controller.serialization; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import org.apache.nifi.connectable.Size; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor; -import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.ExecutionNode; +import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.DomUtils; +import org.apache.nifi.web.api.dto.BundleDTO; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; @@ -50,8 +42,29 @@ import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.w3c.dom.Element; import org.w3c.dom.NodeList; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + public class FlowFromDOMFactory { + public static BundleDTO getBundle(final Element bundleElement) { + if (bundleElement == null) { + return null; + } + + final Element groupElement = DomUtils.getChild(bundleElement, "group"); + final Element artifactElement = DomUtils.getChild(bundleElement, "artifact"); + final Element versionElement = DomUtils.getChild(bundleElement, "version"); + + return new BundleDTO(groupElement.getTextContent(), artifactElement.getTextContent(), versionElement.getTextContent()); + } + public static PositionDTO getPosition(final Element positionElement) { if (positionElement == null) { throw new IllegalArgumentException("Invalid Flow: Found no 'position' element"); @@ -89,6 +102,7 @@ public class FlowFromDOMFactory { dto.setName(getString(element, "name")); dto.setComments(getString(element, "comment")); dto.setType(getString(element, "class")); + dto.setBundle(getBundle(DomUtils.getChild(element, "bundle"))); final boolean enabled = getBoolean(element, "enabled"); dto.setState(enabled ? ControllerServiceState.ENABLED.name() : ControllerServiceState.DISABLED.name()); @@ -106,6 +120,7 @@ public class FlowFromDOMFactory { dto.setName(getString(element, "name")); dto.setComments(getString(element, "comment")); dto.setType(getString(element, "class")); + dto.setBundle(getBundle(DomUtils.getChild(element, "bundle"))); dto.setSchedulingPeriod(getString(element, "schedulingPeriod")); dto.setState(getString(element, "scheduledState")); dto.setSchedulingStrategy(getString(element, "schedulingStrategy")); @@ -353,6 +368,7 @@ public class FlowFromDOMFactory { dto.setId(getString(element, "id")); dto.setName(getString(element, "name")); dto.setType(getString(element, "class")); + dto.setBundle(getBundle(DomUtils.getChild(element, "bundle"))); dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); dto.setStyle(getStyle(DomUtils.getChild(element, "styles"))); http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java index 86614af..bd0a7ad 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizer.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller.serialization; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.MissingBundleException; import org.apache.nifi.controller.UninheritableFlowException; import org.apache.nifi.encrypt.StringEncryptor; @@ -38,8 +39,9 @@ public interface FlowSynchronizer { * @throws FlowSerializationException if proposed flow is not a valid flow configuration file * @throws UninheritableFlowException if the proposed flow cannot be loaded by the controller because in doing so would risk orphaning flow files * @throws FlowSynchronizationException if updates to the controller failed. If this exception is thrown, then the controller should be considered unsafe to be used + * @throws MissingBundleException if the proposed flow cannot be loaded by the controller because it contains a bundle that is not available to the controller */ void sync(FlowController controller, DataFlow dataFlow, StringEncryptor encryptor) - throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException; + throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException, MissingBundleException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index f6e3d2b..aa7022b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -16,24 +16,7 @@ */ package org.apache.nifi.controller.serialization; -import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; -import javax.xml.transform.OutputKeys; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerException; -import javax.xml.transform.TransformerFactory; -import javax.xml.transform.TransformerFactoryConfigurationError; -import javax.xml.transform.dom.DOMSource; -import javax.xml.transform.stream.StreamResult; - +import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; @@ -62,6 +45,23 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.TransformerFactoryConfigurationError; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.TimeUnit; + /** * Serializes a Flow Controller as XML to an output stream. * @@ -203,6 +203,28 @@ public class StandardFlowSerializer implements FlowSerializer { } } + private static void addBundle(final Element parentElement, final BundleCoordinate coordinate) { + // group + final Element groupElement = parentElement.getOwnerDocument().createElement("group"); + groupElement.setTextContent(coordinate.getGroup()); + + // artifact + final Element artifactElement = parentElement.getOwnerDocument().createElement("artifact"); + artifactElement.setTextContent(coordinate.getId()); + + // version + final Element versionElement = parentElement.getOwnerDocument().createElement("version"); + versionElement.setTextContent(coordinate.getVersion()); + + // bundle + final Element bundleElement = parentElement.getOwnerDocument().createElement("bundle"); + bundleElement.appendChild(groupElement); + bundleElement.appendChild(artifactElement); + bundleElement.appendChild(versionElement); + + parentElement.appendChild(bundleElement); + } + private void addStyle(final Element parentElement, final Map<String, String> style) { final Element element = parentElement.getOwnerDocument().createElement("styles"); @@ -340,6 +362,9 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "comment", processor.getComments()); addTextElement(element, "class", processor.getCanonicalClassName()); + + addBundle(element, processor.getBundleCoordinate()); + addTextElement(element, "maxConcurrentTasks", processor.getMaxConcurrentTasks()); addTextElement(element, "schedulingPeriod", processor.getSchedulingPeriod()); addTextElement(element, "penalizationPeriod", processor.getPenalizationPeriod()); @@ -451,6 +476,8 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(serviceElement, "comment", serviceNode.getComments()); addTextElement(serviceElement, "class", serviceNode.getCanonicalClassName()); + addBundle(serviceElement, serviceNode.getBundleCoordinate()); + final ControllerServiceState state = serviceNode.getState(); final boolean enabled = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING); addTextElement(serviceElement, "enabled", String.valueOf(enabled)); @@ -466,6 +493,9 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(taskElement, "name", taskNode.getName()); addTextElement(taskElement, "comment", taskNode.getComments()); addTextElement(taskElement, "class", taskNode.getCanonicalClassName()); + + addBundle(taskElement, taskNode.getBundleCoordinate()); + addTextElement(taskElement, "schedulingPeriod", taskNode.getSchedulingPeriod()); addTextElement(taskElement, "scheduledState", taskNode.getScheduledState().name()); addTextElement(taskElement, "schedulingStrategy", taskNode.getSchedulingStrategy().name()); http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceDetails.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceDetails.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceDetails.java new file mode 100644 index 0000000..3db77c9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceDetails.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.service; + +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.LoggableComponent; +import org.apache.nifi.logging.ComponentLog; + +/** + * Holder for StandardControllerServiceNode to atomically swap out the component. + */ +public class ControllerServiceDetails { + + private final ControllerService proxiedControllerService; + private final ControllerService implementation; + private final ComponentLog componentLog; + private final BundleCoordinate bundleCoordinate; + private final ControllerServiceInvocationHandler invocationHandler; + + public ControllerServiceDetails(final LoggableComponent<ControllerService> implementation, + final LoggableComponent<ControllerService> proxiedControllerService, + final ControllerServiceInvocationHandler invocationHandler) { + this.proxiedControllerService = proxiedControllerService.getComponent(); + this.implementation = implementation.getComponent(); + this.componentLog = implementation.getLogger(); + this.bundleCoordinate = implementation.getBundleCoordinate(); + this.invocationHandler = invocationHandler; + } + + public ControllerService getProxiedControllerService() { + return proxiedControllerService; + } + + public ControllerService getImplementation() { + return implementation; + } + + public ComponentLog getComponentLog() { + return componentLog; + } + + public BundleCoordinate getBundleCoordinate() { + return bundleCoordinate; + } + + public ControllerServiceInvocationHandler getInvocationHandler() { + return invocationHandler; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java index 1596c63..520d3fb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java @@ -16,30 +16,16 @@ */ package org.apache.nifi.controller.service; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; - +import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.serialization.FlowFromDOMFactory; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.util.BundleUtils; import org.apache.nifi.util.DomUtils; +import org.apache.nifi.web.api.dto.BundleDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +34,22 @@ import org.w3c.dom.Element; import org.xml.sax.SAXException; import org.xml.sax.SAXParseException; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + public class ControllerServiceLoader { private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class); @@ -161,7 +163,7 @@ public class ControllerServiceLoader { // create a new id for the clone seeded from the original id so that it is consistent in a cluster final UUID id = UUID.nameUUIDFromBytes(controllerService.getIdentifier().getBytes(StandardCharsets.UTF_8)); - final ControllerServiceNode clone = provider.createControllerService(controllerService.getCanonicalClassName(), id.toString(), false); + final ControllerServiceNode clone = provider.createControllerService(controllerService.getCanonicalClassName(), id.toString(), controllerService.getBundleCoordinate(), false); clone.setName(controllerService.getName()); clone.setComments(controllerService.getComments()); @@ -179,7 +181,19 @@ public class ControllerServiceLoader { private static ControllerServiceNode createControllerService(final ControllerServiceProvider provider, final Element controllerServiceElement, final StringEncryptor encryptor) { final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); - final ControllerServiceNode node = provider.createControllerService(dto.getType(), dto.getId(), false); + BundleCoordinate coordinate; + try { + coordinate = BundleUtils.getCompatibleBundle(dto.getType(), dto.getBundle()); + } catch (final IllegalStateException e) { + final BundleDTO bundleDTO = dto.getBundle(); + if (bundleDTO == null) { + coordinate = BundleCoordinate.UNKNOWN_COORDINATE; + } else { + coordinate = new BundleCoordinate(bundleDTO.getGroup(), bundleDTO.getArtifact(), bundleDTO.getVersion()); + } + } + + final ControllerServiceNode node = provider.createControllerService(dto.getType(), dto.getId(), coordinate, false); node.setName(dto.getName()); node.setComments(dto.getComments()); return node;
