http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 191fc65..f312096 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -35,6 +35,8 @@ import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.DataAuthorizable; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor; import org.apache.nifi.cluster.coordination.node.ClusterRoles; @@ -63,6 +65,7 @@ import org.apache.nifi.controller.cluster.ClusterProtocolHeartbeater; import org.apache.nifi.controller.cluster.Heartbeater; import org.apache.nifi.controller.exception.CommunicationsException; import org.apache.nifi.controller.exception.ComponentLifeCycleException; +import org.apache.nifi.controller.exception.ControllerServiceInstantiationException; import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.label.StandardLabel; @@ -105,6 +108,7 @@ import org.apache.nifi.controller.serialization.FlowSerializationException; import org.apache.nifi.controller.serialization.FlowSerializer; import org.apache.nifi.controller.serialization.FlowSynchronizationException; import org.apache.nifi.controller.serialization.FlowSynchronizer; +import org.apache.nifi.controller.service.ControllerServiceInvocationHandler; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.StandardConfigurationContext; @@ -150,6 +154,7 @@ import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SimpleProcessLogger; +import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.processor.StandardProcessorInitializationContext; import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.provenance.IdentifierLookup; @@ -184,11 +189,13 @@ import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.BundleUtils; import org.apache.nifi.util.ComponentIdGenerator; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.api.dto.BundleDTO; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; @@ -218,6 +225,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; @@ -279,6 +287,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final Set<RemoteSiteListener> externalSiteListeners = new HashSet<>(); private final AtomicReference<CounterRepository> counterRepositoryRef; private final AtomicBoolean initialized = new AtomicBoolean(false); + private final AtomicBoolean flowSynchronized = new AtomicBoolean(false); private final StandardControllerServiceProvider controllerServiceProvider; private final Authorizer authorizer; private final AuditService auditService; @@ -1012,13 +1021,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * * @param type processor type * @param id processor id + * @param coordinate the coordinate of the bundle for this processor * @return new processor * @throws NullPointerException if either arg is null * @throws ProcessorInstantiationException if the processor cannot be * instantiated for any reason */ - public ProcessorNode createProcessor(final String type, final String id) throws ProcessorInstantiationException { - return createProcessor(type, id, true); + public ProcessorNode createProcessor(final String type, final String id, final BundleCoordinate coordinate) throws ProcessorInstantiationException { + return createProcessor(type, id, coordinate, true); } /** @@ -1029,6 +1039,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * * @param type the fully qualified Processor class name * @param id the unique ID of the Processor + * @param coordinate the bundle coordinate for this processor * @param firstTimeAdded whether or not this is the first time this * Processor is added to the graph. If {@code true}, will invoke methods * annotated with the {@link OnAdded} annotation. @@ -1037,39 +1048,63 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @throws ProcessorInstantiationException if the processor cannot be * instantiated for any reason */ - public ProcessorNode createProcessor(final String type, String id, final boolean firstTimeAdded) throws ProcessorInstantiationException { + public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final boolean firstTimeAdded) throws ProcessorInstantiationException { + return createProcessor(type, id, coordinate, firstTimeAdded, true); + } + + /** + * <p> + * Creates a new ProcessorNode with the given type and identifier and + * optionally initializes it. + * </p> + * + * @param type the fully qualified Processor class name + * @param id the unique ID of the Processor + * @param coordinate the bundle coordinate for this processor + * @param firstTimeAdded whether or not this is the first time this + * Processor is added to the graph. If {@code true}, will invoke methods + * annotated with the {@link OnAdded} annotation. + * @return new processor node + * @throws NullPointerException if either arg is null + * @throws ProcessorInstantiationException if the processor cannot be + * instantiated for any reason + */ + public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final boolean firstTimeAdded, final boolean registerLogObserver) + throws ProcessorInstantiationException { id = id.intern(); boolean creationSuccessful; - Processor processor; + LoggableComponent<Processor> processor; try { - processor = instantiateProcessor(type, id); + processor = instantiateProcessor(type, id, coordinate); creationSuccessful = true; } catch (final ProcessorInstantiationException pie) { LOG.error("Could not create Processor of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", pie); final GhostProcessor ghostProc = new GhostProcessor(); ghostProc.setIdentifier(id); ghostProc.setCanonicalClassName(type); - processor = ghostProc; + processor = new LoggableComponent<>(ghostProc, coordinate, null); creationSuccessful = false; } - final ComponentLog logger = new SimpleProcessLogger(id, processor); final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry); final ProcessorNode procNode; if (creationSuccessful) { - procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, nifiProperties, variableRegistry, logger); + procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, nifiProperties, variableRegistry); } else { final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; - procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type, nifiProperties, variableRegistry, logger); + procNode = new StandardProcessorNode( + processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type, nifiProperties, variableRegistry, true); } final LogRepository logRepository = LogRepositoryFactory.getRepository(id); - logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode)); + if (registerLogObserver) { + logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode)); + } try { - final Class<?> procClass = processor.getClass(); + final Class<?> procClass = procNode.getProcessor().getClass(); if(procClass.isAnnotationPresent(DefaultSettings.class)) { DefaultSettings ds = procClass.getAnnotation(DefaultSettings.class); try { @@ -1083,27 +1118,33 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } catch(Throwable ex) { LOG.error(String.format("Error while setting penalty duration from DefaultSettings annotation:%s",ex.getMessage()),ex); } - try { - procNode.setBulletinLevel(ds.bulletinLevel()); - } catch (Throwable ex) { - LOG.error(String.format("Error while setting bulletin level from DefaultSettings annotation:%s",ex.getMessage()),ex); - } + // calling setBulletinLevel changes the level in the LogRepository so we only want to do this when + // the caller said to register the log observer, otherwise we could be changing the level when we didn't mean to + if (registerLogObserver) { + try { + procNode.setBulletinLevel(ds.bulletinLevel()); + } catch (Throwable ex) { + LOG.error(String.format("Error while setting bulletin level from DefaultSettings annotation:%s", ex.getMessage()), ex); + } + } } } catch (Throwable ex) { LOG.error(String.format("Error while setting default settings from DefaultSettings annotation: %s",ex.getMessage()),ex); } if (firstTimeAdded) { - try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { - ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor); + try (final NarCloseable x = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) { + ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, procNode.getProcessor()); } catch (final Exception e) { - logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID); + if (registerLogObserver) { + logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID); + } throw new ComponentLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e); } if (firstTimeAdded) { - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), processor.getIdentifier())) { + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor()); } } @@ -1112,30 +1153,28 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return procNode; } - private Processor instantiateProcessor(final String type, final String identifier) throws ProcessorInstantiationException { - Processor processor; + private LoggableComponent<Processor> instantiateProcessor(final String type, final String identifier, final BundleCoordinate bundleCoordinate) throws ProcessorInstantiationException { + final Bundle processorBundle = ExtensionManager.getBundle(bundleCoordinate); + if (processorBundle == null) { + throw new ProcessorInstantiationException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate()); + } final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); try { - final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type, identifier); - final Class<?> rawClass; - if (detectedClassLoaderForType == null) { - // try to find from the current class loader - rawClass = Class.forName(type); - } else { - // try to find from the registered classloader for that type - rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type, identifier)); - } - + final ClassLoader detectedClassLoaderForType = ExtensionManager.createInstanceClassLoader(type, identifier, processorBundle); + final Class<?> rawClass = Class.forName(type, true, processorBundle.getClassLoader()); Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); + final Class<? extends Processor> processorClass = rawClass.asSubclass(Processor.class); - processor = processorClass.newInstance(); + final Processor processor = processorClass.newInstance(); + final ComponentLog componentLogger = new SimpleProcessLogger(identifier, processor); final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, componentLogger, this, this, nifiProperties); processor.initialize(ctx); LogRepositoryFactory.getRepository(identifier).setLogger(componentLogger); - return processor; + + return new LoggableComponent<>(processor, bundleCoordinate, componentLogger); } catch (final Throwable t) { throw new ProcessorInstantiationException(type, t); } finally { @@ -1145,6 +1184,34 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + public void changeProcessorType(final ProcessorNode existingNode, final String newType, final BundleCoordinate bundleCoordinate) throws ProcessorInstantiationException { + if (existingNode == null) { + throw new IllegalStateException("Existing ProcessorNode cannot be null"); + } + + final String id = existingNode.getProcessor().getIdentifier(); + + // createProcessor will create a new instance class loader for the same id so + // save the instance class loader to use it for calling OnRemoved on the existing processor + final ClassLoader existingInstanceClassLoader = ExtensionManager.getInstanceClassLoader(id); + + // create a new node with firstTimeAdded as true so lifecycle methods get fired + // attempt the creation to make sure it works before firing the OnRemoved methods below + final ProcessorNode newNode = createProcessor(newType, id, bundleCoordinate, true, false); + + // call OnRemoved for the existing processor using the previous instance class loader + try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) { + final StandardProcessContext processContext = new StandardProcessContext( + existingNode, controllerServiceProvider, encryptor, getStateManagerProvider().getStateManager(id), variableRegistry); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getProcessor(), processContext); + } + + // set the new processor in the existing node + final LoggableComponent<Processor> newProcessor = new LoggableComponent<>(newNode.getProcessor(), newNode.getBundleCoordinate(), newNode.getLogger()); + existingNode.setProcessor(newProcessor); + existingNode.setExtensionMissing(newNode.isExtensionMissing()); + } + /** * @return the ExtensionManager used for instantiating Processors, * Prioritizers, etc. @@ -1459,13 +1526,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @throws FlowSynchronizationException if updates to the controller failed. * If this exception is thrown, then the controller should be considered * unsafe to be used + * @throws MissingBundleException if the proposed flow cannot be loaded by the + * controller because it contains a bundle that does not exist in the controller */ public void synchronize(final FlowSynchronizer synchronizer, final DataFlow dataFlow) - throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { + throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException { writeLock.lock(); try { LOG.debug("Synchronizing controller with proposed flow"); synchronizer.sync(this, dataFlow, encryptor); + flowSynchronized.set(true); LOG.info("Successfully synchronized controller with proposed flow"); } finally { writeLock.unlock(); @@ -1630,7 +1700,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Instantiate Controller Services // for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) { - final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), true); + final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(controllerServiceDTO.getType(), controllerServiceDTO.getBundle()); + final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), bundleCoordinate, true); serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData()); serviceNode.setComments(controllerServiceDTO.getComments()); @@ -1717,7 +1788,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Instantiate the processors // for (final ProcessorDTO processorDTO : dto.getProcessors()) { - final ProcessorNode procNode = createProcessor(processorDTO.getType(), processorDTO.getId()); + final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(processorDTO.getType(), processorDTO.getBundle()); + final ProcessorNode procNode = createProcessor(processorDTO.getType(), processorDTO.getId(), bundleCoordinate); procNode.setPosition(toPosition(processorDTO.getPosition())); procNode.setProcessGroup(group); @@ -1953,45 +2025,83 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + private void verifyBundleInSnippet(final BundleDTO requiredBundle, final Set<BundleCoordinate> supportedBundles) { + final BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion()); + if (!supportedBundles.contains(requiredCoordinate)) { + throw new IllegalStateException("Unsupported bundle: " + requiredCoordinate); + } + } + + private void verifyProcessorsInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) { + if (templateContents.getProcessors() != null) { + templateContents.getProcessors().forEach(processor -> { + if (processor.getBundle() == null) { + throw new IllegalArgumentException("Processor bundle must be specified."); + } + + if (supportedTypes.containsKey(processor.getType())) { + verifyBundleInSnippet(processor.getBundle(), supportedTypes.get(processor.getType())); + } else { + throw new IllegalStateException("Invalid Processor Type: " + processor.getType()); + } + }); + } + + if (templateContents.getProcessGroups() != null) { + templateContents.getProcessGroups().forEach(processGroup -> { + verifyProcessorsInSnippet(processGroup.getContents(), supportedTypes); + }); + } + } + + private void verifyControllerServicesInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) { + if (templateContents.getControllerServices() != null) { + templateContents.getControllerServices().forEach(controllerService -> { + if (supportedTypes.containsKey(controllerService.getType())) { + if (controllerService.getBundle() == null) { + throw new IllegalArgumentException("Controller Service bundle must be specified."); + } + + verifyBundleInSnippet(controllerService.getBundle(), supportedTypes.get(controllerService.getType())); + } else { + throw new IllegalStateException("Invalid Controller Service Type: " + controllerService.getType()); + } + }); + } + + if (templateContents.getProcessGroups() != null) { + templateContents.getProcessGroups().forEach(processGroup -> { + verifyControllerServicesInSnippet(processGroup.getContents(), supportedTypes); + }); + } + } + public void verifyComponentTypesInSnippet(final FlowSnippetDTO templateContents) { - // validate that all Processor Types and Prioritizer Types are valid - final Set<String> processorClasses = new HashSet<>(); + final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>(); for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) { - processorClasses.add(c.getName()); + final String name = c.getName(); + processorClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet())); } + verifyProcessorsInSnippet(templateContents, processorClasses); + + final Map<String, Set<BundleCoordinate>> controllerServiceClasses = new HashMap<>(); + for (final Class<?> c : ExtensionManager.getExtensions(ControllerService.class)) { + final String name = c.getName(); + controllerServiceClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet())); + } + verifyControllerServicesInSnippet(templateContents, controllerServiceClasses); + final Set<String> prioritizerClasses = new HashSet<>(); for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) { prioritizerClasses.add(c.getName()); } - final Set<String> controllerServiceClasses = new HashSet<>(); - for (final Class<?> c : ExtensionManager.getExtensions(ControllerService.class)) { - controllerServiceClasses.add(c.getName()); - } - final Set<ProcessorDTO> allProcs = new HashSet<>(); final Set<ConnectionDTO> allConns = new HashSet<>(); - allProcs.addAll(templateContents.getProcessors()); allConns.addAll(templateContents.getConnections()); for (final ProcessGroupDTO childGroup : templateContents.getProcessGroups()) { - allProcs.addAll(findAllProcessors(childGroup)); allConns.addAll(findAllConnections(childGroup)); } - for (final ProcessorDTO proc : allProcs) { - if (!processorClasses.contains(proc.getType())) { - throw new IllegalStateException("Invalid Processor Type: " + proc.getType()); - } - } - - final Set<ControllerServiceDTO> controllerServices = templateContents.getControllerServices(); - if (controllerServices != null) { - for (final ControllerServiceDTO service : controllerServices) { - if (!controllerServiceClasses.contains(service.getType())) { - throw new IllegalStateException("Invalid Controller Service Type: " + service.getType()); - } - } - } - for (final ConnectionDTO conn : allConns) { final List<String> prioritizers = conn.getPrioritizers(); if (prioritizers != null) { @@ -2047,24 +2157,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Recursively finds all ProcessorDTO's - * - * @param group group - * @return processor dto set - */ - private Set<ProcessorDTO> findAllProcessors(final ProcessGroupDTO group) { - final Set<ProcessorDTO> procs = new HashSet<>(); - for (final ProcessorDTO dto : group.getContents().getProcessors()) { - procs.add(dto); - } - - for (final ProcessGroupDTO childGroup : group.getContents().getProcessGroups()) { - procs.addAll(findAllProcessors(childGroup)); - } - return procs; - } - - /** * Recursively finds all ConnectionDTO's * * @param group group @@ -2110,16 +2202,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); try { - final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type); - final Class<?> rawClass; - if (detectedClassLoaderForType == null) { - // try to find from the current class loader - rawClass = Class.forName(type); - } else { - // try to find from the registered classloader for that type - rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type)); + final List<Bundle> prioritizerBundles = ExtensionManager.getBundles(type); + if (prioritizerBundles.size() == 0) { + throw new IllegalStateException(String.format("The specified class '%s' is not known to this nifi.", type)); + } + if (prioritizerBundles.size() > 1) { + throw new IllegalStateException(String.format("Multiple bundles found for the specified class '%s', only one is allowed.", type)); } + final Bundle bundle = prioritizerBundles.get(0); + final ClassLoader detectedClassLoaderForType = bundle.getClassLoader(); + final Class<?> rawClass = Class.forName(type, true, detectedClassLoaderForType); + Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); final Class<? extends FlowFilePrioritizer> prioritizerClass = rawClass.asSubclass(FlowFilePrioritizer.class); final Object processorObj = prioritizerClass.newInstance(); @@ -2772,6 +2866,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return initialized.get(); } + public boolean isFlowSynchronized() { + return flowSynchronized.get(); + } + public void startConnectable(final Connectable connectable) { final ProcessGroup group = requireNonNull(connectable).getProcessGroup(); @@ -2835,83 +2933,66 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R lookupGroup(groupId).stopProcessing(); } - public ReportingTaskNode createReportingTask(final String type) throws ReportingTaskInstantiationException { - return createReportingTask(type, true); + public ReportingTaskNode createReportingTask(final String type, final BundleCoordinate bundleCoordinate) throws ReportingTaskInstantiationException { + return createReportingTask(type, bundleCoordinate, true); } - public ReportingTaskNode createReportingTask(final String type, final boolean firstTimeAdded) throws ReportingTaskInstantiationException { - return createReportingTask(type, UUID.randomUUID().toString(), firstTimeAdded); + public ReportingTaskNode createReportingTask(final String type, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) throws ReportingTaskInstantiationException { + return createReportingTask(type, UUID.randomUUID().toString(), bundleCoordinate, firstTimeAdded); } @Override - public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException { - return createReportingTask(type, id, firstTimeAdded, true); + public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate,final boolean firstTimeAdded) throws ReportingTaskInstantiationException { + return createReportingTask(type, id, bundleCoordinate, firstTimeAdded, true); } - public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded, final boolean register) throws ReportingTaskInstantiationException { - if (type == null || id == null) { + public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded, final boolean register) + throws ReportingTaskInstantiationException { + if (type == null || id == null || bundleCoordinate == null) { throw new NullPointerException(); } - ReportingTask task = null; + LoggableComponent<ReportingTask> task = null; boolean creationSuccessful = true; - final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); try { - final ClassLoader detectedClassLoader = ExtensionManager.getClassLoader(type, id); - final Class<?> rawClass; - if (detectedClassLoader == null) { - rawClass = Class.forName(type); - } else { - rawClass = Class.forName(type, false, detectedClassLoader); - } - - Thread.currentThread().setContextClassLoader(detectedClassLoader); - final Class<? extends ReportingTask> reportingTaskClass = rawClass.asSubclass(ReportingTask.class); - final Object reportingTaskObj = reportingTaskClass.newInstance(); - task = reportingTaskClass.cast(reportingTaskObj); + task = instantiateReportingTask(type, id, bundleCoordinate); } catch (final Exception e) { LOG.error("Could not create Reporting Task of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", e); final GhostReportingTask ghostTask = new GhostReportingTask(); ghostTask.setIdentifier(id); ghostTask.setCanonicalClassName(type); - task = ghostTask; + task = new LoggableComponent<>(ghostTask, bundleCoordinate, null); creationSuccessful = false; - } finally { - if (ctxClassLoader != null) { - Thread.currentThread().setContextClassLoader(ctxClassLoader); - } } - final ComponentLog logger = new SimpleProcessLogger(id, task); final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry); final ReportingTaskNode taskNode; if (creationSuccessful) { - taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, variableRegistry, logger); + taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, variableRegistry); } else { final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; - taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, variableRegistry, logger); + taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, variableRegistry, true); } - taskNode.setName(task.getClass().getSimpleName()); + taskNode.setName(taskNode.getReportingTask().getClass().getSimpleName()); if (firstTimeAdded) { - final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask()); final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(), - SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this, nifiProperties); + SchedulingStrategy.TIMER_DRIVEN, "1 min", taskNode.getLogger(), this, nifiProperties); try { - task.initialize(config); + taskNode.getReportingTask().initialize(config); } catch (final InitializationException ie) { throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + type, ie); } try (final NarCloseable x = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getReportingTask().getIdentifier())) { - ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task); + ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, taskNode.getReportingTask()); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask()); } catch (final Exception e) { - throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e); + throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + taskNode.getReportingTask(), e); } } @@ -2927,6 +3008,63 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return taskNode; } + private LoggableComponent<ReportingTask> instantiateReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate) + throws ReportingTaskInstantiationException { + + final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); + try { + final Bundle reportingTaskBundle = ExtensionManager.getBundle(bundleCoordinate); + if (reportingTaskBundle == null) { + throw new IllegalStateException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate()); + } + + final ClassLoader detectedClassLoader = ExtensionManager.createInstanceClassLoader(type, id, reportingTaskBundle); + final Class<?> rawClass = Class.forName(type, false, detectedClassLoader); + Thread.currentThread().setContextClassLoader(detectedClassLoader); + + final Class<? extends ReportingTask> reportingTaskClass = rawClass.asSubclass(ReportingTask.class); + final Object reportingTaskObj = reportingTaskClass.newInstance(); + + final ReportingTask reportingTask = reportingTaskClass.cast(reportingTaskObj); + final ComponentLog componentLog = new SimpleProcessLogger(id, reportingTask); + + return new LoggableComponent<>(reportingTask, bundleCoordinate, componentLog); + } catch (final Exception e) { + throw new ReportingTaskInstantiationException(type, e); + } finally { + if (ctxClassLoader != null) { + Thread.currentThread().setContextClassLoader(ctxClassLoader); + } + } + } + + @Override + public void changeReportingTaskType(final ReportingTaskNode existingNode, final String newType, final BundleCoordinate bundleCoordinate) throws ReportingTaskInstantiationException { + if (existingNode == null) { + throw new IllegalStateException("Existing ReportingTaskNode cannot be null"); + } + + final String id = existingNode.getReportingTask().getIdentifier(); + + // createReportingTask will create a new instance class loader for the same id so + // save the instance class loader to use it for calling OnRemoved on the existing processor + final ClassLoader existingInstanceClassLoader = ExtensionManager.getInstanceClassLoader(id); + + // set firstTimeAdded to true so lifecycle annotations get fired, but don't register this node + // attempt the creation to make sure it works before firing the OnRemoved methods below + final ReportingTaskNode newNode = createReportingTask(newType, id, bundleCoordinate, true, false); + + // call OnRemoved for the existing reporting task using the previous instance class loader + try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getReportingTask(), existingNode.getConfigurationContext()); + } + + // set the new reporting task into the existing node + final LoggableComponent<ReportingTask> newReportingTask = new LoggableComponent<>(newNode.getReportingTask(), newNode.getBundleCoordinate(), newNode.getLogger()); + existingNode.setReportingTask(newReportingTask); + existingNode.setExtensionMissing(newNode.isExtensionMissing()); + } + @Override public ReportingTaskNode getReportingTaskNode(final String taskId) { return reportingTasks.get(taskId); @@ -2988,8 +3126,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } @Override - public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) { - final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, firstTimeAdded); + public ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) { + final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, bundleCoordinate, firstTimeAdded); // Register log observer to provide bulletins when reporting task logs anything at WARN level or above final LogRepository logRepository = LogRepositoryFactory.getRepository(id); @@ -3007,6 +3145,42 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return serviceNode; } + public void changeControllerServiceType(final ControllerServiceNode existingNode, final String newType, final BundleCoordinate bundleCoordinate) + throws ControllerServiceInstantiationException { + if (existingNode == null) { + throw new IllegalStateException("Existing ControllerServiceNode cannot be null"); + } + + final String id = existingNode.getIdentifier(); + + // createControllerService will create a new instance class loader for the same id so + // save the instance class loader to use it for calling OnRemoved on the existing service + final ClassLoader existingInstanceClassLoader = ExtensionManager.getInstanceClassLoader(id); + + // create a new node with firstTimeAdded as true so lifecycle methods get called + // attempt the creation to make sure it works before firing the OnRemoved methods below + final ControllerServiceNode newNode = controllerServiceProvider.createControllerService(newType, id, bundleCoordinate, true); + + // call OnRemoved for the existing service using the previous instance class loader + try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) { + final ConfigurationContext configurationContext = new StandardConfigurationContext(existingNode, controllerServiceProvider, null, variableRegistry); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getControllerServiceImplementation(), configurationContext); + } + + // take the invocation handler that was created for new proxy and is set to look at the new node, + // and set it to look at the existing node + final ControllerServiceInvocationHandler invocationHandler = newNode.getInvocationHandler(); + invocationHandler.setServiceNode(existingNode); + + // create LoggableComponents for the proxy and implementation + final LoggableComponent<ControllerService> loggableProxy = new LoggableComponent<>(newNode.getProxiedControllerService(), bundleCoordinate, newNode.getLogger()); + final LoggableComponent<ControllerService> loggableImplementation = new LoggableComponent<>(newNode.getControllerServiceImplementation(), bundleCoordinate, newNode.getLogger()); + + // set the new impl, proxy, and invocation handler into the existing node + existingNode.setControllerServiceAndProxy(loggableImplementation, loggableProxy, invocationHandler); + existingNode.setExtensionMissing(newNode.isExtensionMissing()); + } + @Override public void enableReportingTask(final ReportingTaskNode reportingTaskNode) { reportingTaskNode.verifyCanEnable();
http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java new file mode 100644 index 0000000..175f252 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import org.apache.nifi.cluster.ConnectionException; + +/** + * Represents the exceptional case when a node fails to join the cluster because a bundle being used by the cluster does not exist on the node. + */ +public class MissingBundleException extends ConnectionException { + + private static final long serialVersionUID = 198234798234794L; + + public MissingBundleException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public MissingBundleException(Throwable cause) { + super(cause); + } + + public MissingBundleException(String message, Throwable cause) { + super(message, cause); + } + + public MissingBundleException(String message) { + super(message); + } + + public MissingBundleException() { + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java new file mode 100644 index 0000000..687d5c4 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.Processor; + +/** + * Holder for StandardProcessorNode to atomically swap out the component. + */ +public class ProcessorDetails { + + private final Processor processor; + private final Class<?> procClass; + private final boolean triggerWhenEmpty; + private final boolean sideEffectFree; + private final boolean triggeredSerially; + private final boolean triggerWhenAnyDestinationAvailable; + private final boolean eventDrivenSupported; + private final boolean batchSupported; + private final InputRequirement.Requirement inputRequirement; + private final ComponentLog componentLog; + private final BundleCoordinate bundleCoordinate; + + public ProcessorDetails(final LoggableComponent<Processor> processor) { + this.processor = processor.getComponent(); + this.componentLog = processor.getLogger(); + this.bundleCoordinate = processor.getBundleCoordinate(); + + this.procClass = this.processor.getClass(); + this.triggerWhenEmpty = procClass.isAnnotationPresent(TriggerWhenEmpty.class); + this.sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class); + this.batchSupported = procClass.isAnnotationPresent(SupportsBatching.class); + this.triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class); + this.triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class); + this.eventDrivenSupported = procClass.isAnnotationPresent(EventDriven.class) && !triggeredSerially && !triggerWhenEmpty; + + final boolean inputRequirementPresent = procClass.isAnnotationPresent(InputRequirement.class); + if (inputRequirementPresent) { + this.inputRequirement = procClass.getAnnotation(InputRequirement.class).value(); + } else { + this.inputRequirement = InputRequirement.Requirement.INPUT_ALLOWED; + } + } + + public Processor getProcessor() { + return processor; + } + + public Class<?> getProcClass() { + return procClass; + } + + public boolean isTriggerWhenEmpty() { + return triggerWhenEmpty; + } + + public boolean isSideEffectFree() { + return sideEffectFree; + } + + public boolean isTriggeredSerially() { + return triggeredSerially; + } + + public boolean isTriggerWhenAnyDestinationAvailable() { + return triggerWhenAnyDestinationAvailable; + } + + public boolean isEventDrivenSupported() { + return eventDrivenSupported; + } + + public boolean isBatchSupported() { + return batchSupported; + } + + public InputRequirement.Requirement getInputRequirement() { + return inputRequirement; + } + + public ComponentLog getComponentLog() { + return componentLog; + } + + public BundleCoordinate getBundleCoordinate() { + return bundleCoordinate; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 2d35a63..0ce6742 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -16,39 +16,10 @@ */ package org.apache.nifi.controller; -import java.io.BufferedInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer; import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.bundle.Bundle; import org.apache.nifi.cluster.ConnectionException; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; @@ -94,6 +65,38 @@ import org.apache.nifi.web.revision.RevisionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + public class StandardFlowService implements FlowService, ProtocolHandler { private static final String EVENT_CATEGORY = "Controller"; @@ -135,11 +138,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler { */ private NodeIdentifier nodeId; - private final NiFiProperties nifiProperties; - // guardedBy rwLock private boolean firstControllerInitialization = true; + private final NiFiProperties nifiProperties; + private static final String CONNECTION_EXCEPTION_MSG_PREFIX = "Failed to connect node to cluster because "; private static final Logger logger = LoggerFactory.getLogger(StandardFlowService.class); @@ -434,9 +437,28 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } @Override - public void load(final DataFlow dataFlow) throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { + public void load(final DataFlow dataFlow) throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException { if (configuredForClustering) { - final DataFlow proposedFlow = (dataFlow == null) ? createDataFlow() : dataFlow; + // Create the initial flow from disk if it exists, or from serializing the empty root group in flow controller + final DataFlow initialFlow = (dataFlow == null) ? createDataFlow() : dataFlow; + if (logger.isTraceEnabled()) { + logger.trace("InitialFlow = " + new String(initialFlow.getFlow(), StandardCharsets.UTF_8)); + } + + // Sync the initial flow into the flow controller so that if the flow came from disk we loaded the + // whole flow into the flow controller and applied any bundle upgrades + writeLock.lock(); + try { + loadFromBytes(initialFlow, true); + } finally { + writeLock.unlock(); + } + + // Get the proposed flow by serializing the flow controller which now has the synced version from above + final DataFlow proposedFlow = createDataFlowFromController(); + if (logger.isTraceEnabled()) { + logger.trace("ProposedFlow = " + new String(proposedFlow.getFlow(), StandardCharsets.UTF_8)); + } /* * Attempt to connect to the cluster. If the manager is able to @@ -457,9 +479,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { if (response == null || response.shouldTryLater()) { logger.info("Flow controller will load local dataflow and suspend connection handshake until a cluster connection response is received."); - // load local proposed flow - loadFromBytes(proposedFlow, false); - // set node ID on controller before we start heartbeating because heartbeat needs node ID controller.setNodeId(nodeId); clusterCoordinator.setLocalNodeIdentifier(nodeId); @@ -479,6 +498,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler { */ controller.startHeartbeating(); + // Initialize the controller after the flow is loaded so we don't take any actions on repos until everything is good + initializeController(); + // notify controller that flow is initialized try { controller.onFlowInitialized(autoResumeState); @@ -491,21 +513,26 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } else { try { loadFromConnectionResponse(response); - dao.save(controller, true); } catch (final Exception e) { logger.error("Failed to load flow from cluster due to: " + e, e); handleConnectionFailure(e); throw new IOException(e); } } + + // save the flow in the controller so we write out the latest flow with any updated bundles to disk + dao.save(controller, true); + } finally { writeLock.unlock(); } } else { writeLock.lock(); try { - // operating in standalone mode, so load proposed flow + // operating in standalone mode, so load proposed flow and initialize the controller loadFromBytes(dataFlow, true); + initializeController(); + dao.save(controller, true); } finally { writeLock.unlock(); } @@ -516,6 +543,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler { DisconnectionCode disconnectionCode; if (ex instanceof UninheritableFlowException) { disconnectionCode = DisconnectionCode.MISMATCHED_FLOWS; + } else if (ex instanceof MissingBundleException) { + disconnectionCode = DisconnectionCode.MISSING_BUNDLE; } else if (ex instanceof FlowSynchronizationException) { disconnectionCode = DisconnectionCode.MISMATCHED_FLOWS; } else { @@ -533,7 +562,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // create the response final FlowResponseMessage response = new FlowResponseMessage(); - response.setDataFlow(createDataFlow()); + response.setDataFlow(createDataFlowFromController()); return response; } catch (final Exception ex) { throw new ProtocolException("Failed serializing flow controller state for flow request due to: " + ex, ex); @@ -549,15 +578,15 @@ public class StandardFlowService implements FlowService, ProtocolHandler { @Override public StandardDataFlow createDataFlow() throws IOException { - final byte[] snippetBytes = controller.getSnippetManager().export(); - final byte[] authorizerFingerprint = getAuthorizerFingerprint(); - // Load the flow from disk if the file exists. if (dao.isFlowPresent()) { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); dao.load(baos); final byte[] bytes = baos.toByteArray(); - final StandardDataFlow fromDisk = new StandardDataFlow(bytes, snippetBytes, authorizerFingerprint); + + final byte[] snippetBytes = controller.getSnippetManager().export(); + final byte[] authorizerFingerprint = getAuthorizerFingerprint(); + final StandardDataFlow fromDisk = new StandardDataFlow(bytes, snippetBytes, authorizerFingerprint, new HashSet<>()); return fromDisk; } @@ -566,14 +595,28 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // will automatically create a Root Process Group, and we need to ensure that // we replicate that Process Group to all nodes in the cluster, so that they all // end up with the same ID for the root Process Group. + return createDataFlowFromController(); + } + + @Override + public StandardDataFlow createDataFlowFromController() throws IOException { + final byte[] snippetBytes = controller.getSnippetManager().export(); + final byte[] authorizerFingerprint = getAuthorizerFingerprint(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); dao.save(controller, baos); final byte[] flowBytes = baos.toByteArray(); baos.reset(); - return new StandardDataFlow(flowBytes, snippetBytes, authorizerFingerprint); + final Set<String> missingComponents = new HashSet<>(); + controller.getRootGroup().findAllProcessors().stream().filter(p -> p.isExtensionMissing()).forEach(p -> missingComponents.add(p.getIdentifier())); + controller.getAllControllerServices().stream().filter(cs -> cs.isExtensionMissing()).forEach(cs -> missingComponents.add(cs.getIdentifier())); + controller.getAllReportingTasks().stream().filter(r -> r.isExtensionMissing()).forEach(r -> missingComponents.add(r.getIdentifier())); + + return new StandardDataFlow(flowBytes, snippetBytes, authorizerFingerprint, missingComponents); } + private NodeIdentifier getNodeId() { readLock.lock(); try { @@ -593,7 +636,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { if (connectionResponse.getDataFlow() == null) { logger.info("Received a Reconnection Request that contained no DataFlow. Will attempt to connect to cluster using local flow."); - connectionResponse = connect(false, false, createDataFlow()); + connectionResponse = connect(false, false, createDataFlowFromController()); } loadFromConnectionResponse(connectionResponse); @@ -623,7 +666,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { writeLock.lock(); try { - logger.info("Disconnecting node."); + logger.info("Disconnecting node due to " + explanation); // mark node as not connected controller.setConnectionStatus(new NodeConnectionStatus(nodeId, DisconnectionCode.UNKNOWN, explanation)); @@ -638,7 +681,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { controller.setClustered(false, null); clusterCoordinator.setConnected(false); - logger.info("Node disconnected."); + logger.info("Node disconnected due to " + explanation); } finally { writeLock.unlock(); @@ -647,31 +690,30 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // write lock must already be acquired private void loadFromBytes(final DataFlow proposedFlow, final boolean allowEmptyFlow) - throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { + throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException { logger.trace("Loading flow from bytes"); // resolve the given flow (null means load flow from disk) final DataFlow actualProposedFlow; final byte[] flowBytes; final byte[] authorizerFingerprint; + final Set<String> missingComponents; + if (proposedFlow == null) { final ByteArrayOutputStream flowOnDisk = new ByteArrayOutputStream(); copyCurrentFlow(flowOnDisk); flowBytes = flowOnDisk.toByteArray(); authorizerFingerprint = getAuthorizerFingerprint(); + missingComponents = new HashSet<>(); logger.debug("Loaded Flow from bytes"); } else { flowBytes = proposedFlow.getFlow(); authorizerFingerprint = proposedFlow.getAuthorizerFingerprint(); + missingComponents = proposedFlow.getMissingComponents(); logger.debug("Loaded flow from proposed flow"); } - actualProposedFlow = new StandardDataFlow(flowBytes, null, authorizerFingerprint); - - if (firstControllerInitialization) { - // load the controller services - logger.debug("Loading controller services"); - } + actualProposedFlow = new StandardDataFlow(flowBytes, null, authorizerFingerprint, missingComponents); // load the flow logger.debug("Loading proposed flow into FlowController"); @@ -682,6 +724,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler { throw new FlowSynchronizationException("Failed to load flow because unable to connect to cluster and local flow is empty"); } + + final List<Template> templates = loadTemplates(); for (final Template template : templates) { final Template existing = rootGroup.getTemplate(template.getIdentifier()); @@ -692,16 +736,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { logger.info("Template '{}' was already present in Root Group so will not import from file", template.getDetails().getName()); } } - - // lazy initialization of controller tasks and flow - if (firstControllerInitialization) { - logger.debug("First controller initialization. Loading reporting tasks and initializing controller."); - - // initialize the flow - controller.initializeFlow(); - - firstControllerInitialization = false; - } } /** @@ -867,6 +901,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // get the dataflow from the response final DataFlow dataFlow = response.getDataFlow(); + if (logger.isTraceEnabled()) { + logger.trace("ResponseFlow = " + new String(dataFlow.getFlow(), StandardCharsets.UTF_8)); + } // load new controller state loadFromBytes(dataFlow, true); @@ -884,6 +921,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler { controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED)); + // Initialize the controller after the flow is loaded so we don't take any actions on repos until everything is good + initializeController(); + // start the processors as indicated by the dataflow controller.onFlowInitialized(autoResumeState); @@ -892,6 +932,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler { controller.startHeartbeating(); } catch (final UninheritableFlowException ufe) { throw new UninheritableFlowException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow is different than cluster flow.", ufe); + } catch (final MissingBundleException mbe) { + throw new MissingBundleException(CONNECTION_EXCEPTION_MSG_PREFIX + "cluster flow contains bundles that do not exist on the current node", mbe); } catch (final FlowSerializationException fse) { throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + "local or cluster flow is malformed.", fse); } catch (final FlowSynchronizationException fse) { @@ -905,6 +947,14 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } + private void initializeController() throws IOException { + if (firstControllerInitialization) { + logger.debug("First controller initialization, initializing controller..."); + controller.initializeFlow(); + firstControllerInitialization = false; + } + } + @Override public void copyCurrentFlow(final OutputStream os) throws IOException { readLock.lock(); @@ -939,9 +989,15 @@ public class StandardFlowService implements FlowService, ProtocolHandler { @Override public void run() { - final ClassLoader currentCl = Thread.currentThread().getContextClassLoader(); - final ClassLoader cl = NarClassLoaders.getInstance().getFrameworkClassLoader(); - Thread.currentThread().setContextClassLoader(cl); + ClassLoader currentCl = null; + + final Bundle frameworkBundle = NarClassLoaders.getInstance().getFrameworkBundle(); + if (frameworkBundle != null) { + currentCl = Thread.currentThread().getContextClassLoader(); + final ClassLoader cl = frameworkBundle.getClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + } + try { //Hang onto the SaveHolder here rather than setting it to null because if the save fails we will try again final SaveHolder holder = StandardFlowService.this.saveHolder.get();
