http://git-wip-us.apache.org/repos/asf/nifi/blob/b974445d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --cc 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 3c816d0,0c39eda..f69c510
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@@ -74,1241 -74,1242 +74,1241 @@@ import org.slf4j.LoggerFactory
   */
  public class StandardProcessorNode extends ProcessorNode implements 
Connectable {
  
 -      public static final String BULLETIN_OBSERVER_ID = "bulletin-observer";
 -
 -      public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
 -      public static final String DEFAULT_YIELD_PERIOD = "1 sec";
 -      public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec";
 -      private final AtomicReference<ProcessGroup> processGroup;
 -      private final Processor processor;
 -      private final AtomicReference<String> identifier;
 -      private final Map<Connection, Connectable> destinations;
 -      private final Map<Relationship, Set<Connection>> connections;
 -      private final AtomicReference<Set<Relationship>> 
undefinedRelationshipsToTerminate;
 -      private final AtomicReference<List<Connection>> incomingConnectionsRef;
 -      private final ReentrantReadWriteLock rwLock;
 -      private final Lock readLock;
 -      private final Lock writeLock;
 -      private final AtomicBoolean isolated;
 -      private final AtomicBoolean lossTolerant;
 -      private final AtomicReference<ScheduledState> scheduledState;
 -      private final AtomicReference<String> comments;
 -      private final AtomicReference<String> name;
 -      private final AtomicReference<Position> position;
 -      private final AtomicReference<String> annotationData;
 -      private final AtomicReference<String> schedulingPeriod; // stored as 
string so it's presented to user as they entered it
 -      private final AtomicReference<String> yieldPeriod;
 -      private final AtomicReference<String> penalizationPeriod;
 -      private final AtomicReference<Map<String, String>> style;
 -      private final AtomicInteger concurrentTaskCount;
 -      private final AtomicLong yieldExpiration;
 -      private final AtomicLong schedulingNanos;
 -      private final boolean triggerWhenEmpty;
 -      private final boolean sideEffectFree;
 -      private final boolean triggeredSerially;
 -      private final boolean triggerWhenAnyDestinationAvailable;
 -      private final boolean eventDrivenSupported;
 -      private final boolean batchSupported;
 -      private final Requirement inputRequirement;
 -      private final ValidationContextFactory validationContextFactory;
 -      private final ProcessScheduler processScheduler;
 -      private long runNanos = 0L;
 -
 -      private SchedulingStrategy schedulingStrategy;  // guarded by 
read/write lock
 -
 -      @SuppressWarnings("deprecation")
 -      public StandardProcessorNode(final Processor processor, final String 
uuid, final ValidationContextFactory validationContextFactory,
 -              final ProcessScheduler scheduler, final 
ControllerServiceProvider controllerServiceProvider) {
 -              super(processor, uuid, validationContextFactory, 
controllerServiceProvider);
 -
 -              this.processor = processor;
 -              identifier = new AtomicReference<>(uuid);
 -              destinations = new HashMap<>();
 -              connections = new HashMap<>();
 -              incomingConnectionsRef = new 
AtomicReference<List<Connection>>(new ArrayList<Connection>());
 -              scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
 -              rwLock = new ReentrantReadWriteLock(false);
 -              readLock = rwLock.readLock();
 -              writeLock = rwLock.writeLock();
 -              lossTolerant = new AtomicBoolean(false);
 -              final Set<Relationship> emptySetOfRelationships = new 
HashSet<>();
 -              undefinedRelationshipsToTerminate = new 
AtomicReference<>(emptySetOfRelationships);
 -              comments = new AtomicReference<>("");
 -              name = new 
AtomicReference<>(processor.getClass().getSimpleName());
 -              schedulingPeriod = new AtomicReference<>("0 sec");
 -              schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS);
 -              yieldPeriod = new AtomicReference<>(DEFAULT_YIELD_PERIOD);
 -              yieldExpiration = new AtomicLong(0L);
 -              concurrentTaskCount = new AtomicInteger(1);
 -              position = new AtomicReference<>(new Position(0D, 0D));
 -              style = new AtomicReference<>(Collections.unmodifiableMap(new 
HashMap<String, String>()));
 -              this.processGroup = new AtomicReference<>();
 -              processScheduler = scheduler;
 -              annotationData = new AtomicReference<>();
 -              isolated = new AtomicBoolean(false);
 -              penalizationPeriod = new 
AtomicReference<>(DEFAULT_PENALIZATION_PERIOD);
 -
 -              final Class<?> procClass = processor.getClass();
 -              triggerWhenEmpty = 
procClass.isAnnotationPresent(TriggerWhenEmpty.class) || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenEmpty.class);
 -              sideEffectFree = 
procClass.isAnnotationPresent(SideEffectFree.class) || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SideEffectFree.class);
 -              batchSupported = 
procClass.isAnnotationPresent(SupportsBatching.class) || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SupportsBatching.class);
 -              triggeredSerially = 
procClass.isAnnotationPresent(TriggerSerially.class) || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerSerially.class);
 -              triggerWhenAnyDestinationAvailable = 
procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class)
 -                      || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class);
 -              this.validationContextFactory = validationContextFactory;
 -              eventDrivenSupported = 
(procClass.isAnnotationPresent(EventDriven.class)
 -                      || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.EventDriven.class))
 && !triggeredSerially && !triggerWhenEmpty;
 -
 -              final boolean inputRequirementPresent = 
procClass.isAnnotationPresent(InputRequirement.class);
 -              if (inputRequirementPresent) {
 -                      inputRequirement = 
procClass.getAnnotation(InputRequirement.class).value();
 -              } else {
 -                      inputRequirement = Requirement.INPUT_ALLOWED;
 -              }
 -
 -              schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN;
 -      }
 -
 -      /**
 -       * @return comments about this specific processor instance
 -       */
 -      @Override
 -      public String getComments() {
 -              return comments.get();
 -      }
 -
 -      /**
 -       * Provides and opportunity to retain information about this particular 
processor instance
 -       *
 -       * @param comments new comments
 -       */
 -      @Override
 -      public void setComments(final String comments) {
 -              writeLock.lock();
 -              try {
 -                      if (isRunning()) {
 -                              throw new IllegalStateException("Cannot modify 
Processor configuration while the Processor is running");
 -                      }
 -                      this.comments.set(comments);
 -              } finally {
 -                      writeLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public ScheduledState getScheduledState() {
 -              return scheduledState.get();
 -      }
 -
 -      @Override
 -      public Position getPosition() {
 -              return position.get();
 -      }
 -
 -      @Override
 -      public void setPosition(Position position) {
 -              this.position.set(position);
 -      }
 -
 -      @Override
 -      public Map<String, String> getStyle() {
 -              return style.get();
 -      }
 -
 -      @Override
 -      public void setStyle(final Map<String, String> style) {
 -              if (style != null) {
 -                      this.style.set(Collections.unmodifiableMap(new 
HashMap<>(style)));
 -              }
 -      }
 -
 -      @Override
 -      public String getIdentifier() {
 -              return identifier.get();
 -      }
 -
 -      /**
 -       * @return if true flow file content generated by this processor is 
considered loss tolerant
 -       */
 -      @Override
 -      public boolean isLossTolerant() {
 -              return lossTolerant.get();
 -      }
 -
 -      @Override
 -      public boolean isIsolated() {
 -              return isolated.get();
 -      }
 -
 -      /**
 -       * @return true if the processor has the {@link TriggerWhenEmpty} 
annotation, false otherwise.
 -       */
 -      @Override
 -      public boolean isTriggerWhenEmpty() {
 -              return triggerWhenEmpty;
 -      }
 -
 -      /**
 -       * @return true if the processor has the {@link SideEffectFree} 
annotation, false otherwise.
 -       */
 -      @Override
 -      public boolean isSideEffectFree() {
 -              return sideEffectFree;
 -      }
 -
 -      @Override
 -      public boolean isHighThroughputSupported() {
 -              return batchSupported;
 -      }
 -
 -      /**
 -       * @return true if the processor has the {@link 
TriggerWhenAnyDestinationAvailable} annotation, false otherwise.
 -       */
 -      @Override
 -      public boolean isTriggerWhenAnyDestinationAvailable() {
 -              return triggerWhenAnyDestinationAvailable;
 -      }
 -
 -      /**
 -       * Indicates whether flow file content made by this processor must be 
persisted
 -       *
 -       * @param lossTolerant tolerant
 -       */
 -      @Override
 -      public void setLossTolerant(final boolean lossTolerant) {
 -              writeLock.lock();
 -              try {
 -                      if (isRunning()) {
 -                              throw new IllegalStateException("Cannot modify 
Processor configuration while the Processor is running");
 -                      }
 -                      this.lossTolerant.set(lossTolerant);
 -              } finally {
 -                      writeLock.unlock();
 -              }
 -      }
 -
 -      /**
 -       * Indicates whether the processor runs on only the primary node.
 -       *
 -       * @param isolated isolated
 -       */
 -      public void setIsolated(final boolean isolated) {
 -              writeLock.lock();
 -              try {
 -                      if (isRunning()) {
 -                              throw new IllegalStateException("Cannot modify 
Processor configuration while the Processor is running");
 -                      }
 -                      this.isolated.set(isolated);
 -              } finally {
 -                      writeLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public boolean isAutoTerminated(final Relationship relationship) {
 -              final Set<Relationship> terminatable = 
undefinedRelationshipsToTerminate.get();
 -              if (terminatable == null) {
 -                      return false;
 -              }
 -              return terminatable.contains(relationship);
 -      }
 -
 -      @Override
 -      public void setAutoTerminatedRelationships(final Set<Relationship> 
terminate) {
 -              writeLock.lock();
 -              try {
 -                      if (isRunning()) {
 -                              throw new IllegalStateException("Cannot modify 
Processor configuration while the Processor is running");
 -                      }
 -
 -                      for (final Relationship rel : terminate) {
 -                              if (!getConnections(rel).isEmpty()) {
 -                                      throw new IllegalStateException("Cannot 
mark relationship '" + rel.getName() + "' as auto-terminated because Connection 
already exists with this relationship");
 -                              }
 -                      }
 -                      undefinedRelationshipsToTerminate.set(new 
HashSet<>(terminate));
 -              } finally {
 -                      writeLock.unlock();
 -              }
 -      }
 -
 -      /**
 -       * @return an unmodifiable Set that contains all of the 
ProcessorRelationship objects that are configured to be auto-terminated
 -       */
 -      @Override
 -      public Set<Relationship> getAutoTerminatedRelationships() {
 -              Set<Relationship> relationships = 
undefinedRelationshipsToTerminate.get();
 -              if (relationships == null) {
 -                      relationships = new HashSet<>();
 -              }
 -              return Collections.unmodifiableSet(relationships);
 -      }
 -
 -      @Override
 -      public String getName() {
 -              return name.get();
 -      }
 -
 -      /**
 -       * @return the value of the processor's {@link CapabilityDescription} 
annotation, if one exists, else <code>null</code>.
 -       */
 -      @SuppressWarnings("deprecation")
 -      public String getProcessorDescription() {
 -              CapabilityDescription capDesc = 
processor.getClass().getAnnotation(CapabilityDescription.class);
 -              String description = null;
 -              if (capDesc != null) {
 -                      description = capDesc.value();
 -              } else {
 -                      final 
org.apache.nifi.processor.annotation.CapabilityDescription deprecatedCapDesc
 -                      = 
processor.getClass().getAnnotation(org.apache.nifi.processor.annotation.CapabilityDescription.class);
 -                      if (deprecatedCapDesc != null) {
 -                              description = deprecatedCapDesc.value();
 -                      }
 -              }
 -
 -              return description;
 -      }
 -
 -      @Override
 -      public void setName(final String name) {
 -              writeLock.lock();
 -              try {
 -                      if (isRunning()) {
 -                              throw new IllegalStateException("Cannot modify 
Processor configuration while the Processor is running");
 -                      }
 -                      this.name.set(name);
 -              } finally {
 -                      writeLock.unlock();
 -              }
 -      }
 -
 -      /**
 -       * @param timeUnit determines the unit of time to represent the 
scheduling period. If null will be reported in units of {@link 
#DEFAULT_SCHEDULING_TIME_UNIT}
 -       * @return the schedule period that should elapse before subsequent 
cycles of this processor's tasks
 -       */
 -      @Override
 -      public long getSchedulingPeriod(final TimeUnit timeUnit) {
 -              return timeUnit.convert(schedulingNanos.get(), 
TimeUnit.NANOSECONDS);
 -      }
 -
 -      @Override
 -      public boolean isEventDrivenSupported() {
 -              readLock.lock();
 -              try {
 -                      return this.eventDrivenSupported;
 -              } finally {
 -                      readLock.unlock();
 -              }
 -      }
 -
 -      /**
 -       * Updates the Scheduling Strategy used for this Processor
 -       *
 -       * @param schedulingStrategy strategy
 -       *
 -       * @throws IllegalArgumentException if the SchedulingStrategy is not 
not applicable for this Processor
 -       */
 -      @Override
 -      public void setSchedulingStrategy(final SchedulingStrategy 
schedulingStrategy) {
 -              writeLock.lock();
 -              try {
 -                      if (schedulingStrategy == 
SchedulingStrategy.EVENT_DRIVEN && !eventDrivenSupported) {
 -                              // not valid. Just ignore it. We don't throw an 
Exception because if a developer changes a Processor so that
 -                              // it no longer supports EventDriven mode, we 
don't want the app to fail to startup if it was already in Event-Driven
 -                              // Mode. Instead, we will simply leave it in 
Timer-Driven mode
 -                              return;
 -                      }
 -
 -                      this.schedulingStrategy = schedulingStrategy;
 -                      setIsolated(schedulingStrategy == 
SchedulingStrategy.PRIMARY_NODE_ONLY);
 -              } finally {
 -                      writeLock.unlock();
 -              }
 -      }
 -
 -      /**
 -       * @return the currently configured scheduling strategy
 -       */
 -      @Override
 -      public SchedulingStrategy getSchedulingStrategy() {
 -              readLock.lock();
 -              try {
 -                      return this.schedulingStrategy;
 -              } finally {
 -                      readLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public String getSchedulingPeriod() {
 -              return schedulingPeriod.get();
 -      }
 -
 -      @Override
 -      public void setScheduldingPeriod(final String schedulingPeriod) {
 -              writeLock.lock();
 -              try {
 -                      if (isRunning()) {
 -                              throw new IllegalStateException("Cannot modify 
Processor configuration while the Processor is running");
 -                      }
 -
 -                      switch (schedulingStrategy) {
 -                              case CRON_DRIVEN: {
 -                                      try {
 -                                              new 
CronExpression(schedulingPeriod);
 -                                      } catch (final Exception e) {
 -                                              throw new 
IllegalArgumentException("Scheduling Period is not a valid cron expression: " + 
schedulingPeriod);
 -                                      }
 -                              }
 -                              break;
 -                              case PRIMARY_NODE_ONLY:
 -                              case TIMER_DRIVEN: {
 -                                      final long schedulingNanos = 
FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), 
TimeUnit.NANOSECONDS);
 -                                      if (schedulingNanos < 0) {
 -                                              throw new 
IllegalArgumentException("Scheduling Period must be positive");
 -                                      }
 -                                      
this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos));
 -                              }
 -                              break;
 -                              case EVENT_DRIVEN:
 -                              default:
 -                                      return;
 -                      }
 -
 -                      this.schedulingPeriod.set(schedulingPeriod);
 -              } finally {
 -                      writeLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public long getRunDuration(final TimeUnit timeUnit) {
 -              readLock.lock();
 -              try {
 -                      return timeUnit.convert(this.runNanos, 
TimeUnit.NANOSECONDS);
 -              } finally {
 -                      readLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public void setRunDuration(final long duration, final TimeUnit 
timeUnit) {
 -              writeLock.lock();
 -              try {
 -                      if (duration < 0) {
 -                              throw new IllegalArgumentException("Run 
Duration must be non-negative value; cannot set to " + 
timeUnit.toSeconds(duration) + " seconds");
 -                      }
 -
 -                      this.runNanos = timeUnit.toNanos(duration);
 -              } finally {
 -                      writeLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public long getYieldPeriod(final TimeUnit timeUnit) {
 -              return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit 
== null ? DEFAULT_TIME_UNIT : timeUnit);
 -      }
 -
 -      @Override
 -      public String getYieldPeriod() {
 -              return yieldPeriod.get();
 -      }
 -
 -      @Override
 -      public void setYieldPeriod(final String yieldPeriod) {
 -              writeLock.lock();
 -              try {
 -                      if (isRunning()) {
 -                              throw new IllegalStateException("Cannot modify 
Processor configuration while the Processor is running");
 -                      }
 -                      final long yieldMillis = 
FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS);
 -                      if (yieldMillis < 0) {
 -                              throw new IllegalArgumentException("Yield 
duration must be positive");
 -                      }
 -                      this.yieldPeriod.set(yieldPeriod);
 -              } finally {
 -                      writeLock.unlock();
 -              }
 -      }
 -
 -      /**
 -       * Causes the processor not to be scheduled for some period of time. 
This duration can be obtained and set via the {@link #getYieldPeriod(TimeUnit)} 
and {@link #setYieldPeriod(long, TimeUnit)}
 -       * methods.
 -       */
 -      @Override
 -      public void yield() {
 -              final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS);
 -              yield(yieldMillis, TimeUnit.MILLISECONDS);
 -
 -              final String yieldDuration = (yieldMillis > 1000) ? 
(yieldMillis / 1000) + " seconds" : yieldMillis + " milliseconds";
 -              LoggerFactory.getLogger(processor.getClass()).debug("{} has 
chosen to yield its resources; will not be scheduled to run again for {}", 
processor, yieldDuration);
 -      }
 -
 -      @Override
 -      public void yield(final long period, final TimeUnit timeUnit) {
 -              final long yieldMillis = TimeUnit.MILLISECONDS.convert(period, 
timeUnit);
 -              yieldExpiration.set(Math.max(yieldExpiration.get(), 
System.currentTimeMillis() + yieldMillis));
 -
 -              processScheduler.yield(this);
 -      }
 -
 -      /**
 -       * @return the number of milliseconds since Epoch at which time this 
processor is to once again be scheduled.
 -       */
 -      @Override
 -      public long getYieldExpiration() {
 -              return yieldExpiration.get();
 -      }
 -
 -      @Override
 -      public long getPenalizationPeriod(final TimeUnit timeUnit) {
 -              return FormatUtils.getTimeDuration(getPenalizationPeriod(), 
timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
 -      }
 -
 -      @Override
 -      public String getPenalizationPeriod() {
 -              return penalizationPeriod.get();
 -      }
 -
 -      @Override
 -      public void setPenalizationPeriod(final String penalizationPeriod) {
 -              writeLock.lock();
 -              try {
 -                      if (isRunning()) {
 -                              throw new IllegalStateException("Cannot modify 
Processor configuration while the Processor is running");
 -                      }
 -                      final long penalizationMillis = 
FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod), 
TimeUnit.MILLISECONDS);
 -                      if (penalizationMillis < 0) {
 -                              throw new 
IllegalArgumentException("Penalization duration must be positive");
 -                      }
 -                      this.penalizationPeriod.set(penalizationPeriod);
 -              } finally {
 -                      writeLock.unlock();
 -              }
 -      }
 -
 -      /**
 -       * Determines the number of concurrent tasks that may be running for 
this processor.
 -       *
 -       * @param taskCount a number of concurrent tasks this processor may 
have running
 -       * @throws IllegalArgumentException if the given value is less than 1
 -       */
 -      @Override
 -      public void setMaxConcurrentTasks(final int taskCount) {
 -              writeLock.lock();
 -              try {
 -                      if (isRunning()) {
 -                              throw new IllegalStateException("Cannot modify 
Processor configuration while the Processor is running");
 -                      }
 -                      if (taskCount < 1 && getSchedulingStrategy() != 
SchedulingStrategy.EVENT_DRIVEN) {
 -                              throw new IllegalArgumentException();
 -                      }
 -                      if (!triggeredSerially) {
 -                              concurrentTaskCount.set(taskCount);
 -                      }
 -              } finally {
 -                      writeLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public boolean isTriggeredSerially() {
 -              return triggeredSerially;
 -      }
 -
 -      /**
 -       * @return the number of tasks that may execute concurrently for this 
processor
 -       */
 -      @Override
 -      public int getMaxConcurrentTasks() {
 -              return concurrentTaskCount.get();
 -      }
 -
 -      @Override
 -      public LogLevel getBulletinLevel() {
 -              return 
LogRepositoryFactory.getRepository(getIdentifier()).getObservationLevel(BULLETIN_OBSERVER_ID);
 -      }
 -
 -      @Override
 -      public void setBulletinLevel(final LogLevel level) {
 -              
LogRepositoryFactory.getRepository(getIdentifier()).setObservationLevel(BULLETIN_OBSERVER_ID,
 level);
 -      }
 -
 -      @Override
 -      public Set<Connection> getConnections() {
 -              final Set<Connection> allConnections = new HashSet<>();
 -              readLock.lock();
 -              try {
 -                      for (final Set<Connection> connectionSet : 
connections.values()) {
 -                              allConnections.addAll(connectionSet);
 -                      }
 -              } finally {
 -                      readLock.unlock();
 -              }
 -
 -              return allConnections;
 -      }
 -
 -      @Override
 -      public List<Connection> getIncomingConnections() {
 -              return incomingConnectionsRef.get();
 -      }
 -
 -      @Override
 -      public Set<Connection> getConnections(final Relationship relationship) {
 -              final Set<Connection> applicableConnections;
 -              readLock.lock();
 -              try {
 -                      applicableConnections = connections.get(relationship);
 -              } finally {
 -                      readLock.unlock();
 -              }
 -              return (applicableConnections == null) ? 
Collections.<Connection>emptySet() : 
Collections.unmodifiableSet(applicableConnections);
 -      }
 -
 -      @Override
 -      public void addConnection(final Connection connection) {
 -              Objects.requireNonNull(connection, "connection cannot be null");
 -
 -              if (!connection.getSource().equals(this) && 
!connection.getDestination().equals(this)) {
 -                      throw new IllegalStateException("Cannot a connection to 
a ProcessorNode for which the ProcessorNode is neither the Source nor the 
Destination");
 -              }
 -
 -              writeLock.lock();
 -              try {
 -                      List<Connection> updatedIncoming = null;
 -                      if (connection.getDestination().equals(this)) {
 -                              // don't add the connection twice. This may 
occur if we have a self-loop because we will be told
 -                              // to add the connection once because we are 
the source and again because we are the destination.
 -                              final List<Connection> incomingConnections = 
incomingConnectionsRef.get();
 -                              updatedIncoming = new 
ArrayList<>(incomingConnections);
 -                              if (!updatedIncoming.contains(connection)) {
 -                                      updatedIncoming.add(connection);
 -                              }
 -                      }
 -
 -                      if (connection.getSource().equals(this)) {
 -                              // don't add the connection twice. This may 
occur if we have a self-loop because we will be told
 -                              // to add the connection once because we are 
the source and again because we are the destination.
 -                              if (!destinations.containsKey(connection)) {
 -                                      for (final Relationship relationship : 
connection.getRelationships()) {
 -                                              final Relationship rel = 
getRelationship(relationship.getName());
 -                                              Set<Connection> set = 
connections.get(rel);
 -                                              if (set == null) {
 -                                                      set = new HashSet<>();
 -                                                      connections.put(rel, 
set);
 -                                              }
 -
 -                                              set.add(connection);
 -
 -                                              destinations.put(connection, 
connection.getDestination());
 -                                      }
 -
 -                                      final Set<Relationship> autoTerminated 
= this.undefinedRelationshipsToTerminate.get();
 -                                      if (autoTerminated != null) {
 -                                              
autoTerminated.removeAll(connection.getRelationships());
 -                                              
this.undefinedRelationshipsToTerminate.set(autoTerminated);
 -                                      }
 -                              }
 -                      }
 -
 -                      if (updatedIncoming != null) {
 -                              
incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
 -                      }
 -              } finally {
 -                      writeLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public boolean hasIncomingConnection() {
 -              return !incomingConnectionsRef.get().isEmpty();
 -      }
 -
 -      @Override
 -      public void updateConnection(final Connection connection) throws 
IllegalStateException {
 -              if (requireNonNull(connection).getSource().equals(this)) {
 -                      writeLock.lock();
 -                      try {
 -                              //
 -                              // update any relationships
 -                              //
 -                              // first check if any relations were removed.
 -                              final List<Relationship> existingRelationships 
= new ArrayList<>();
 -                              for (final Map.Entry<Relationship, 
Set<Connection>> entry : connections.entrySet()) {
 -                                      if 
(entry.getValue().contains(connection)) {
 -                                              
existingRelationships.add(entry.getKey());
 -                                      }
 -                              }
 -
 -                              for (final Relationship rel : 
connection.getRelationships()) {
 -                                      if 
(!existingRelationships.contains(rel)) {
 -                                              // relationship was removed. 
Check if this is legal.
 -                                              final Set<Connection> 
connectionsForRelationship = getConnections(rel);
 -                                              if (connectionsForRelationship 
!= null && connectionsForRelationship.size() == 1 && this.isRunning() && 
!isAutoTerminated(rel) && getRelationships().contains(rel)) {
 -                                                      // if we are running 
and we do not terminate undefined relationships and this is the only
 -                                                      // connection that 
defines the given relationship, and that relationship is required,
 -                                                      // then it is not legal 
to remove this relationship from this connection.
 -                                                      throw new 
IllegalStateException("Cannot remove relationship " + rel.getName() + " from 
Connection because doing so would invalidate Processor "
 -                                                              + this + ", 
which is currently running");
 -                                              }
 -                                      }
 -                              }
 -
 -                              // remove the connection from any list that 
currently contains
 -                              for (final Set<Connection> list : 
connections.values()) {
 -                                      list.remove(connection);
 -                              }
 -
 -                              // add the connection in for all relationships 
listed.
 -                              for (final Relationship rel : 
connection.getRelationships()) {
 -                                      Set<Connection> set = 
connections.get(rel);
 -                                      if (set == null) {
 -                                              set = new HashSet<>();
 -                                              connections.put(rel, set);
 -                                      }
 -                                      set.add(connection);
 -                              }
 -
 -                              // update to the new destination
 -                              destinations.put(connection, 
connection.getDestination());
 -
 -                              final Set<Relationship> autoTerminated = 
this.undefinedRelationshipsToTerminate.get();
 -                              if (autoTerminated != null) {
 -                                      
autoTerminated.removeAll(connection.getRelationships());
 -                                      
this.undefinedRelationshipsToTerminate.set(autoTerminated);
 -                              }
 -                      } finally {
 -                              writeLock.unlock();
 -                      }
 -              }
 -
 -              if (connection.getDestination().equals(this)) {
 -                      writeLock.lock();
 -                      try {
 -                              // update our incoming connections -- we can 
just remove & re-add the connection to
 -                              // update the list.
 -                              final List<Connection> incomingConnections = 
incomingConnectionsRef.get();
 -                              final List<Connection> updatedIncoming = new 
ArrayList<>(incomingConnections);
 -                              updatedIncoming.remove(connection);
 -                              updatedIncoming.add(connection);
 -                              
incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
 -                      } finally {
 -                              writeLock.unlock();
 -                      }
 -              }
 -      }
 -
 -      @Override
 -      public void removeConnection(final Connection connection) {
 -              boolean connectionRemoved = false;
 -
 -              if (requireNonNull(connection).getSource().equals(this)) {
 -                      for (final Relationship relationship : 
connection.getRelationships()) {
 -                              final Set<Connection> 
connectionsForRelationship = getConnections(relationship);
 -                              if ((connectionsForRelationship == null || 
connectionsForRelationship.size() <= 1) && isRunning()) {
 -                                      throw new IllegalStateException("This 
connection cannot be removed because its source is running and removing it will 
invalidate this processor");
 -                              }
 -                      }
 -
 -                      writeLock.lock();
 -                      try {
 -                              for (final Set<Connection> connectionList : 
this.connections.values()) {
 -                                      connectionList.remove(connection);
 -                              }
 -
 -                              connectionRemoved = 
(destinations.remove(connection) != null);
 -                      } finally {
 -                              writeLock.unlock();
 -                      }
 -              }
 -
 -              if (connection.getDestination().equals(this)) {
 -                      writeLock.lock();
 -                      try {
 -                              final List<Connection> incomingConnections = 
incomingConnectionsRef.get();
 -                              if (incomingConnections.contains(connection)) {
 -                                      final List<Connection> updatedIncoming 
= new ArrayList<>(incomingConnections);
 -                                      updatedIncoming.remove(connection);
 -                                      
incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
 -                                      return;
 -                              }
 -                      } finally {
 -                              writeLock.unlock();
 -                      }
 -              }
 -
 -              if (!connectionRemoved) {
 -                      throw new IllegalArgumentException("Cannot remove a 
connection from a ProcessorNode for which the ProcessorNode is not the Source");
 -              }
 -      }
 -
 -      /**
 -       * @param relationshipName name
 -       * @return the relationship for this nodes processor for the given name 
or creates a new relationship for the given name
 -       */
 -      @Override
 -      public Relationship getRelationship(final String relationshipName) {
 -              final Relationship specRel = new 
Relationship.Builder().name(relationshipName).build();
 -              Relationship returnRel = specRel;
 -
 -              final Set<Relationship> relationships;
 -              try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
 -                      relationships = processor.getRelationships();
 -              }
 -
 -              for (final Relationship rel : relationships) {
 -                      if (rel.equals(specRel)) {
 -                              returnRel = rel;
 -                              break;
 -                      }
 -              }
 -              return returnRel;
 -      }
 -
 -      @Override
 -      public Processor getProcessor() {
 -              return this.processor;
 -      }
 -
 -      /**
 -       * @return the Set of destination processors for all relationships 
excluding any destinations that are this processor itself (self-loops)
 -       */
 -      public Set<Connectable> getDestinations() {
 -              final Set<Connectable> nonSelfDestinations = new HashSet<>();
 -              readLock.lock();
 -              try {
 -                      for (final Connectable connectable : 
destinations.values()) {
 -                              if (connectable != this) {
 -                                      nonSelfDestinations.add(connectable);
 -                              }
 -                      }
 -              } finally {
 -                      readLock.unlock();
 -              }
 -              return nonSelfDestinations;
 -      }
 -
 -      public Set<Connectable> getDestinations(final Relationship 
relationship) {
 -              readLock.lock();
 -              try {
 -                      final Set<Connectable> destinationSet = new HashSet<>();
 -                      final Set<Connection> relationshipConnections = 
connections.get(relationship);
 -                      if (relationshipConnections != null) {
 -                              for (final Connection connection : 
relationshipConnections) {
 -                                      
destinationSet.add(destinations.get(connection));
 -                              }
 -                      }
 -                      return destinationSet;
 -              } finally {
 -                      readLock.unlock();
 -              }
 -      }
 -
 -      public Set<Relationship> getUndefinedRelationships() {
 -              final Set<Relationship> undefined = new HashSet<>();
 -              readLock.lock();
 -              try {
 -                      final Set<Relationship> relationships;
 -                      try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
 -                              relationships = processor.getRelationships();
 -                      }
 -
 -                      if (relationships == null) {
 -                              return undefined;
 -                      }
 -                      for (final Relationship relation : relationships) {
 -                              final Set<Connection> connectionSet = 
this.connections.get(relation);
 -                              if (connectionSet == null || 
connectionSet.isEmpty()) {
 -                                      undefined.add(relation);
 -                              }
 -                      }
 -              } finally {
 -                      readLock.unlock();
 -              }
 -              return undefined;
 -      }
 -
 -      /**
 -       * Determines if the given node is a destination for this node
 -       *
 -       * @param node node
 -       * @return true if is a direct destination node; false otherwise
 -       */
 -      boolean isRelated(final ProcessorNode node) {
 -              readLock.lock();
 -              try {
 -                      return this.destinations.containsValue(node);
 -              } finally {
 -                      readLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public boolean isRunning() {
 -              readLock.lock();
 -              try {
 -                      return 
getScheduledState().equals(ScheduledState.RUNNING) || 
processScheduler.getActiveThreadCount(this) > 0;
 -              } finally {
 -                      readLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public int getActiveThreadCount() {
 -              readLock.lock();
 -              try {
 -                      return processScheduler.getActiveThreadCount(this);
 -              } finally {
 -                      readLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public boolean isValid() {
 -              readLock.lock();
 -              try {
 -                      final ValidationContext validationContext = 
validationContextFactory.newValidationContext(getProperties(), 
getAnnotationData());
 -
 -                      final Collection<ValidationResult> validationResults;
 -                      try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
 -                              validationResults = 
getProcessor().validate(validationContext);
 -                      }
 -
 -                      for (final ValidationResult result : validationResults) 
{
 -                              if (!result.isValid()) {
 -                                      return false;
 -                              }
 -                      }
 -
 -                      for (final Relationship undef : 
getUndefinedRelationships()) {
 -                              if (!isAutoTerminated(undef)) {
 -                                      return false;
 -                              }
 -                      }
 -
 -                      switch (getInputRequirement()) {
 -                              case INPUT_ALLOWED:
 -                                      break;
 -                              case INPUT_FORBIDDEN: {
 -                                      if 
(!getIncomingConnections().isEmpty()) {
 -                                              return false;
 -                                      }
 -                                      break;
 -                              }
 -                              case INPUT_REQUIRED: {
 -                                      if (getIncomingConnections().isEmpty()) 
{
 -                                              return false;
 -                                      }
 -                                      break;
 -                              }
 -                      }
 -              } catch (final Throwable t) {
 -                      return false;
 -              } finally {
 -                      readLock.unlock();
 -              }
 -
 -              return true;
 -      }
 -
 -      @Override
 -      public Collection<ValidationResult> getValidationErrors() {
 -              final List<ValidationResult> results = new ArrayList<>();
 -              readLock.lock();
 -              try {
 -                      final ValidationContext validationContext = 
validationContextFactory.newValidationContext(getProperties(), 
getAnnotationData());
 -
 -                      final Collection<ValidationResult> validationResults;
 -                      try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
 -                              validationResults = 
getProcessor().validate(validationContext);
 -                      }
 -
 -                      for (final ValidationResult result : validationResults) 
{
 -                              if (!result.isValid()) {
 -                                      results.add(result);
 -                              }
 -                      }
 -
 -                      for (final Relationship relationship : 
getUndefinedRelationships()) {
 -                              if (!isAutoTerminated(relationship)) {
 -                                      final ValidationResult error = new 
ValidationResult.Builder()
 -                                              .explanation("Relationship '" + 
relationship.getName() + "' is not connected to any component and is not 
auto-terminated")
 -                                              .subject("Relationship " + 
relationship.getName())
 -                                              .valid(false)
 -                                              .build();
 -                                      results.add(error);
 -                              }
 -                      }
 -
 -                      switch (getInputRequirement()) {
 -                              case INPUT_ALLOWED:
 -                                      break;
 -                              case INPUT_FORBIDDEN: {
 -                                      final int incomingConnCount = 
getIncomingConnections().size();
 -                                      if (incomingConnCount != 0) {
 -                                              results.add(new 
ValidationResult.Builder()
 -                                                      .explanation("Processor 
does not accept Incoming Connections but is currently configured with " + 
incomingConnCount + " Incoming Connections")
 -                                                      .subject("Incoming 
Connections")
 -                                                      .valid(false)
 -                                                      .build());
 -                                      }
 -                                      break;
 -                              }
 -                              case INPUT_REQUIRED: {
 -                                      if (getIncomingConnections().isEmpty()) 
{
 -                                              results.add(new 
ValidationResult.Builder()
 -                                                      .explanation("Processor 
required at least one Incoming Connection in order to perform its function but 
currently has no Incoming Connection")
 -                                                      .subject("Incoming 
Connections")
 -                                                      .valid(false)
 -                                                      .build());
 -                                      }
 -                                      break;
 -                              }
 -                      }
 -              } catch (final Throwable t) {
 -                      results.add(new 
ValidationResult.Builder().explanation("Failed to run validation due to " + 
t.toString()).valid(false).build());
 -              } finally {
 -                      readLock.unlock();
 -              }
 -              return results;
 -      }
 -
 -      @Override
 -      public Requirement getInputRequirement() {
 -              return inputRequirement;
 -      }
 -
 -      /**
 -       * Establishes node equality (based on the processor's identifier)
 -       *
 -       * @param other node
 -       * @return true if equal
 -       */
 -      @Override
 -      public boolean equals(final Object other) {
 -              if (!(other instanceof ProcessorNode)) {
 -                      return false;
 -              }
 -              final ProcessorNode on = (ProcessorNode) other;
 -              return new EqualsBuilder().append(identifier.get(), 
on.getIdentifier()).isEquals();
 -      }
 -
 -      @Override
 -      public int hashCode() {
 -              return new HashCodeBuilder(7, 
67).append(identifier).toHashCode();
 -      }
 -
 -      @Override
 -      public Collection<Relationship> getRelationships() {
 -              try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
 -                      return getProcessor().getRelationships();
 -              }
 -      }
 -
 -      @Override
 -      public String toString() {
 -              try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
 -                      return getProcessor().toString();
 -              }
 -      }
 -
 -      @Override
 -      public ProcessGroup getProcessGroup() {
 -              return processGroup.get();
 -      }
 -
 -      @Override
 -      public void setProcessGroup(final ProcessGroup group) {
 -              writeLock.lock();
 -              try {
 -                      this.processGroup.set(group);
 -              } finally {
 -                      writeLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
 -              try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
 -                      processor.onTrigger(context, sessionFactory);
 -              }
 -      }
 -
 -      @Override
 -      public ConnectableType getConnectableType() {
 -              return ConnectableType.PROCESSOR;
 -      }
 -
 -      @Override
 -      public void setScheduledState(final ScheduledState scheduledState) {
 -              this.scheduledState.set(scheduledState);
 -              if (!scheduledState.equals(ScheduledState.RUNNING)) {   // if 
user stops processor, clear yield expiration
 -                      yieldExpiration.set(0L);
 -              }
 -      }
 -
 -      @Override
 -      public void setAnnotationData(final String data) {
 -              writeLock.lock();
 -              try {
 -                      if (isRunning()) {
 -                              throw new IllegalStateException("Cannot set 
AnnotationData while processor is running");
 -                      }
 -
 -                      this.annotationData.set(data);
 -              } finally {
 -                      writeLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public String getAnnotationData() {
 -              return annotationData.get();
 -      }
 -
 -      @Override
 -      public Collection<ValidationResult> validate(final ValidationContext 
validationContext) {
 -              return getValidationErrors();
 -      }
 -
 -      @Override
 -      public void verifyCanDelete() throws IllegalStateException {
 -              verifyCanDelete(false);
 -      }
 -
 -      @Override
 -      public void verifyCanDelete(final boolean ignoreConnections) {
 -              readLock.lock();
 -              try {
 -                      if (isRunning()) {
 -                              throw new IllegalStateException(this + " is 
running");
 -                      }
 -
 -                      if (!ignoreConnections) {
 -                              for (final Set<Connection> connectionSet : 
connections.values()) {
 -                                      for (final Connection connection : 
connectionSet) {
 -                                              connection.verifyCanDelete();
 -                                      }
 -                              }
 -
 -                              for (final Connection connection : 
incomingConnectionsRef.get()) {
 -                                      if 
(connection.getSource().equals(this)) {
 -                                              connection.verifyCanDelete();
 -                                      } else {
 -                                              throw new 
IllegalStateException(this + " is the destination of another component");
 -                                      }
 -                              }
 -                      }
 -              } finally {
 -                      readLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public void verifyCanStart() {
 -              readLock.lock();
 -              try {
 -                      switch (getScheduledState()) {
 -                              case DISABLED:
 -                                      throw new IllegalStateException(this + 
" cannot be started because it is disabled");
 -                              case RUNNING:
 -                                      throw new IllegalStateException(this + 
" cannot be started because it is already running");
 -                              case STOPPED:
 -                                      break;
 -                      }
 -                      verifyNoActiveThreads();
 -
 -                      if (!isValid()) {
 -                              throw new IllegalStateException(this + " is not 
in a valid state");
 -                      }
 -              } finally {
 -                      readLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public void verifyCanStart(final Set<ControllerServiceNode> 
ignoredReferences) {
 -              switch (getScheduledState()) {
 -                      case DISABLED:
 -                              throw new IllegalStateException(this + " cannot 
be started because it is disabled");
 -                      case RUNNING:
 -                              throw new IllegalStateException(this + " cannot 
be started because it is already running");
 -                      case STOPPED:
 -                              break;
 -              }
 -              verifyNoActiveThreads();
 -
 -              final Set<String> ids = new HashSet<>();
 -              for (final ControllerServiceNode node : ignoredReferences) {
 -                      ids.add(node.getIdentifier());
 -              }
 -
 -              final Collection<ValidationResult> validationResults = 
getValidationErrors(ids);
 -              for (final ValidationResult result : validationResults) {
 -                      if (!result.isValid()) {
 -                              throw new IllegalStateException(this + " cannot 
be started because it is not valid: " + result);
 -                      }
 -              }
 -      }
 -
 -      @Override
 -      public void verifyCanStop() {
 -              if (getScheduledState() != ScheduledState.RUNNING) {
 -                      throw new IllegalStateException(this + " is not 
scheduled to run");
 -              }
 -      }
 -
 -      @Override
 -      public void verifyCanUpdate() {
 -              readLock.lock();
 -              try {
 -                      if (isRunning()) {
 -                              throw new IllegalStateException(this + " is not 
stopped");
 -                      }
 -              } finally {
 -                      readLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public void verifyCanEnable() {
 -              readLock.lock();
 -              try {
 -                      if (getScheduledState() != ScheduledState.DISABLED) {
 -                              throw new IllegalStateException(this + " is not 
disabled");
 -                      }
 -
 -                      verifyNoActiveThreads();
 -              } finally {
 -                      readLock.unlock();
 -              }
 -      }
 -
 -      @Override
 -      public void verifyCanDisable() {
 -              readLock.lock();
 -              try {
 -                      if (getScheduledState() != ScheduledState.STOPPED) {
 -                              throw new IllegalStateException(this + " is not 
stopped");
 -                      }
 -                      verifyNoActiveThreads();
 -              } finally {
 -                      readLock.unlock();
 -              }
 -      }
 -
 -      private void verifyNoActiveThreads() throws IllegalStateException {
 -              final int threadCount = 
processScheduler.getActiveThreadCount(this);
 -              if (threadCount > 0) {
 -                      throw new IllegalStateException(this + " has " + 
threadCount + " threads still active");
 -              }
 -      }
 -
 -      @Override
 +    public static final String BULLETIN_OBSERVER_ID = "bulletin-observer";
 +
 +    public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
 +    public static final String DEFAULT_YIELD_PERIOD = "1 sec";
 +    public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec";
 +    private final AtomicReference<ProcessGroup> processGroup;
 +    private final Processor processor;
 +    private final AtomicReference<String> identifier;
 +    private final Map<Connection, Connectable> destinations;
 +    private final Map<Relationship, Set<Connection>> connections;
 +    private final AtomicReference<Set<Relationship>> 
undefinedRelationshipsToTerminate;
 +    private final AtomicReference<List<Connection>> incomingConnectionsRef;
 +    private final ReentrantReadWriteLock rwLock;
 +    private final Lock readLock;
 +    private final Lock writeLock;
 +    private final AtomicBoolean isolated;
 +    private final AtomicBoolean lossTolerant;
 +    private final AtomicReference<ScheduledState> scheduledState;
 +    private final AtomicReference<String> comments;
 +    private final AtomicReference<String> name;
 +    private final AtomicReference<Position> position;
 +    private final AtomicReference<String> annotationData;
 +    private final AtomicReference<String> schedulingPeriod; // stored as 
string so it's presented to user as they entered it
 +    private final AtomicReference<String> yieldPeriod;
 +    private final AtomicReference<String> penalizationPeriod;
 +    private final AtomicReference<Map<String, String>> style;
 +    private final AtomicInteger concurrentTaskCount;
 +    private final AtomicLong yieldExpiration;
 +    private final AtomicLong schedulingNanos;
 +    private final boolean triggerWhenEmpty;
 +    private final boolean sideEffectFree;
 +    private final boolean triggeredSerially;
 +    private final boolean triggerWhenAnyDestinationAvailable;
 +    private final boolean eventDrivenSupported;
 +    private final boolean batchSupported;
 +    private final Requirement inputRequirement;
 +    private final ValidationContextFactory validationContextFactory;
 +    private final ProcessScheduler processScheduler;
 +    private long runNanos = 0L;
 +
 +    private SchedulingStrategy schedulingStrategy; // guarded by read/write 
lock
 +
 +    @SuppressWarnings("deprecation")
 +    public StandardProcessorNode(final Processor processor, final String 
uuid, final ValidationContextFactory validationContextFactory,
 +        final ProcessScheduler scheduler, final ControllerServiceProvider 
controllerServiceProvider) {
 +        super(processor, uuid, validationContextFactory, 
controllerServiceProvider);
 +
 +        this.processor = processor;
 +        identifier = new AtomicReference<>(uuid);
 +        destinations = new HashMap<>();
 +        connections = new HashMap<>();
 +        incomingConnectionsRef = new AtomicReference<List<Connection>>(new 
ArrayList<Connection>());
 +        scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
 +        rwLock = new ReentrantReadWriteLock(false);
 +        readLock = rwLock.readLock();
 +        writeLock = rwLock.writeLock();
 +        lossTolerant = new AtomicBoolean(false);
 +        final Set<Relationship> emptySetOfRelationships = new HashSet<>();
 +        undefinedRelationshipsToTerminate = new 
AtomicReference<>(emptySetOfRelationships);
 +        comments = new AtomicReference<>("");
 +        name = new AtomicReference<>(processor.getClass().getSimpleName());
 +        schedulingPeriod = new AtomicReference<>("0 sec");
 +        schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS);
 +        yieldPeriod = new AtomicReference<>(DEFAULT_YIELD_PERIOD);
 +        yieldExpiration = new AtomicLong(0L);
 +        concurrentTaskCount = new AtomicInteger(1);
 +        position = new AtomicReference<>(new Position(0D, 0D));
 +        style = new AtomicReference<>(Collections.unmodifiableMap(new 
HashMap<String, String>()));
 +        this.processGroup = new AtomicReference<>();
 +        processScheduler = scheduler;
 +        annotationData = new AtomicReference<>();
 +        isolated = new AtomicBoolean(false);
 +        penalizationPeriod = new 
AtomicReference<>(DEFAULT_PENALIZATION_PERIOD);
 +
 +        final Class<?> procClass = processor.getClass();
 +        triggerWhenEmpty = 
procClass.isAnnotationPresent(TriggerWhenEmpty.class) || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenEmpty.class);
 +        sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class) 
|| 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SideEffectFree.class);
 +        batchSupported = 
procClass.isAnnotationPresent(SupportsBatching.class) || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SupportsBatching.class);
 +        triggeredSerially = 
procClass.isAnnotationPresent(TriggerSerially.class) || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerSerially.class);
 +        triggerWhenAnyDestinationAvailable = 
procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class)
 +            || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class);
 +        this.validationContextFactory = validationContextFactory;
 +        eventDrivenSupported = 
(procClass.isAnnotationPresent(EventDriven.class)
 +            || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.EventDriven.class))
 && !triggeredSerially && !triggerWhenEmpty;
 +
 +        final boolean inputRequirementPresent = 
procClass.isAnnotationPresent(InputRequirement.class);
 +        if (inputRequirementPresent) {
 +            inputRequirement = 
procClass.getAnnotation(InputRequirement.class).value();
 +        } else {
 +            inputRequirement = Requirement.INPUT_ALLOWED;
 +        }
 +
 +        schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN;
 +    }
 +
 +    /**
 +     * @return comments about this specific processor instance
 +     */
 +    @Override
 +    public String getComments() {
 +        return comments.get();
 +    }
 +
 +    /**
 +     * Provides and opportunity to retain information about this particular 
processor instance
 +     *
 +     * @param comments new comments
 +     */
 +    @Override
 +    public void setComments(final String comments) {
 +        writeLock.lock();
 +        try {
 +            if (isRunning()) {
 +                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
 +            }
 +            this.comments.set(comments);
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public ScheduledState getScheduledState() {
 +        return scheduledState.get();
 +    }
 +
 +    @Override
 +    public Position getPosition() {
 +        return position.get();
 +    }
 +
 +    @Override
 +    public void setPosition(Position position) {
 +        this.position.set(position);
 +    }
 +
 +    @Override
 +    public Map<String, String> getStyle() {
 +        return style.get();
 +    }
 +
 +    @Override
 +    public void setStyle(final Map<String, String> style) {
 +        if (style != null) {
 +            this.style.set(Collections.unmodifiableMap(new HashMap<>(style)));
 +        }
 +    }
 +
 +    @Override
 +    public String getIdentifier() {
 +        return identifier.get();
 +    }
 +
 +    /**
 +     * @return if true flow file content generated by this processor is 
considered loss tolerant
 +     */
 +    @Override
 +    public boolean isLossTolerant() {
 +        return lossTolerant.get();
 +    }
 +
 +    @Override
 +    public boolean isIsolated() {
 +        return isolated.get();
 +    }
 +
 +    /**
 +     * @return true if the processor has the {@link TriggerWhenEmpty} 
annotation, false otherwise.
 +     */
 +    @Override
 +    public boolean isTriggerWhenEmpty() {
 +        return triggerWhenEmpty;
 +    }
 +
 +    /**
 +     * @return true if the processor has the {@link SideEffectFree} 
annotation, false otherwise.
 +     */
 +    @Override
 +    public boolean isSideEffectFree() {
 +        return sideEffectFree;
 +    }
 +
 +    @Override
 +    public boolean isHighThroughputSupported() {
 +        return batchSupported;
 +    }
 +
 +    /**
 +     * @return true if the processor has the {@link 
TriggerWhenAnyDestinationAvailable} annotation, false otherwise.
 +     */
 +    @Override
 +    public boolean isTriggerWhenAnyDestinationAvailable() {
 +        return triggerWhenAnyDestinationAvailable;
 +    }
 +
 +    /**
 +     * Indicates whether flow file content made by this processor must be 
persisted
 +     *
 +     * @param lossTolerant tolerant
 +     */
 +    @Override
 +    public void setLossTolerant(final boolean lossTolerant) {
 +        writeLock.lock();
 +        try {
 +            if (isRunning()) {
 +                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
 +            }
 +            this.lossTolerant.set(lossTolerant);
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * Indicates whether the processor runs on only the primary node.
 +     *
 +     * @param isolated isolated
 +     */
 +    public void setIsolated(final boolean isolated) {
 +        writeLock.lock();
 +        try {
 +            if (isRunning()) {
 +                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
 +            }
 +            this.isolated.set(isolated);
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public boolean isAutoTerminated(final Relationship relationship) {
 +        final Set<Relationship> terminatable = 
undefinedRelationshipsToTerminate.get();
 +        if (terminatable == null) {
 +            return false;
 +        }
 +        return terminatable.contains(relationship);
 +    }
 +
 +    @Override
 +    public void setAutoTerminatedRelationships(final Set<Relationship> 
terminate) {
 +        writeLock.lock();
 +        try {
 +            if (isRunning()) {
 +                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
 +            }
 +
 +            for (final Relationship rel : terminate) {
 +                if (!getConnections(rel).isEmpty()) {
 +                    throw new IllegalStateException("Cannot mark relationship 
'" + rel.getName() + "' as auto-terminated because Connection already exists 
with this relationship");
 +                }
 +            }
 +            undefinedRelationshipsToTerminate.set(new HashSet<>(terminate));
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * @return an unmodifiable Set that contains all of the 
ProcessorRelationship objects that are configured to be auto-terminated
 +     */
 +    @Override
 +    public Set<Relationship> getAutoTerminatedRelationships() {
 +        Set<Relationship> relationships = 
undefinedRelationshipsToTerminate.get();
 +        if (relationships == null) {
 +            relationships = new HashSet<>();
 +        }
 +        return Collections.unmodifiableSet(relationships);
 +    }
 +
 +    @Override
 +    public String getName() {
 +        return name.get();
 +    }
 +
 +    /**
 +     * @return the value of the processor's {@link CapabilityDescription} 
annotation, if one exists, else <code>null</code>.
 +     */
 +    @SuppressWarnings("deprecation")
 +    public String getProcessorDescription() {
 +        CapabilityDescription capDesc = 
processor.getClass().getAnnotation(CapabilityDescription.class);
 +        String description = null;
 +        if (capDesc != null) {
 +            description = capDesc.value();
 +        } else {
 +            final org.apache.nifi.processor.annotation.CapabilityDescription 
deprecatedCapDesc = 
processor.getClass().getAnnotation(org.apache.nifi.processor.annotation.CapabilityDescription.class);
 +            if (deprecatedCapDesc != null) {
 +                description = deprecatedCapDesc.value();
 +            }
 +        }
 +
 +        return description;
 +    }
 +
 +    @Override
 +    public void setName(final String name) {
 +        writeLock.lock();
 +        try {
 +            if (isRunning()) {
 +                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
 +            }
 +            this.name.set(name);
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * @param timeUnit determines the unit of time to represent the 
scheduling period. If null will be reported in units of {@link 
#DEFAULT_SCHEDULING_TIME_UNIT}
 +     * @return the schedule period that should elapse before subsequent 
cycles of this processor's tasks
 +     */
 +    @Override
 +    public long getSchedulingPeriod(final TimeUnit timeUnit) {
 +        return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS);
 +    }
 +
 +    @Override
 +    public boolean isEventDrivenSupported() {
 +        readLock.lock();
 +        try {
 +            return this.eventDrivenSupported;
 +        } finally {
 +            readLock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * Updates the Scheduling Strategy used for this Processor
 +     *
 +     * @param schedulingStrategy strategy
 +     *
 +     * @throws IllegalArgumentException if the SchedulingStrategy is not not 
applicable for this Processor
 +     */
 +    @Override
 +    public void setSchedulingStrategy(final SchedulingStrategy 
schedulingStrategy) {
 +        writeLock.lock();
 +        try {
 +            if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && 
!eventDrivenSupported) {
 +                // not valid. Just ignore it. We don't throw an Exception 
because if a developer changes a Processor so that
 +                // it no longer supports EventDriven mode, we don't want the 
app to fail to startup if it was already in Event-Driven
 +                // Mode. Instead, we will simply leave it in Timer-Driven mode
 +                return;
 +            }
 +
 +            this.schedulingStrategy = schedulingStrategy;
 +            setIsolated(schedulingStrategy == 
SchedulingStrategy.PRIMARY_NODE_ONLY);
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * @return the currently configured scheduling strategy
 +     */
 +    @Override
 +    public SchedulingStrategy getSchedulingStrategy() {
 +        readLock.lock();
 +        try {
 +            return this.schedulingStrategy;
 +        } finally {
 +            readLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public String getSchedulingPeriod() {
 +        return schedulingPeriod.get();
 +    }
 +
 +    @Override
 +    public void setScheduldingPeriod(final String schedulingPeriod) {
 +        writeLock.lock();
 +        try {
 +            if (isRunning()) {
 +                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
 +            }
 +
 +            switch (schedulingStrategy) {
 +                case CRON_DRIVEN: {
 +                    try {
 +                        new CronExpression(schedulingPeriod);
 +                    } catch (final Exception e) {
 +                        throw new IllegalArgumentException("Scheduling Period 
is not a valid cron expression: " + schedulingPeriod);
 +                    }
 +                }
 +                    break;
 +                case PRIMARY_NODE_ONLY:
 +                case TIMER_DRIVEN: {
 +                    final long schedulingNanos = 
FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), 
TimeUnit.NANOSECONDS);
 +                    if (schedulingNanos < 0) {
 +                        throw new IllegalArgumentException("Scheduling Period 
must be positive");
 +                    }
 +                    
this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos));
 +                }
 +                    break;
 +                case EVENT_DRIVEN:
 +                default:
 +                    return;
 +            }
 +
 +            this.schedulingPeriod.set(schedulingPeriod);
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public long getRunDuration(final TimeUnit timeUnit) {
 +        readLock.lock();
 +        try {
 +            return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS);
 +        } finally {
 +            readLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public void setRunDuration(final long duration, final TimeUnit timeUnit) {
 +        writeLock.lock();
 +        try {
 +            if (duration < 0) {
 +                throw new IllegalArgumentException("Run Duration must be 
non-negative value; cannot set to " + timeUnit.toSeconds(duration) + " 
seconds");
 +            }
 +
 +            this.runNanos = timeUnit.toNanos(duration);
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public long getYieldPeriod(final TimeUnit timeUnit) {
 +        return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null 
? DEFAULT_TIME_UNIT : timeUnit);
 +    }
 +
 +    @Override
 +    public String getYieldPeriod() {
 +        return yieldPeriod.get();
 +    }
 +
 +    @Override
 +    public void setYieldPeriod(final String yieldPeriod) {
 +        writeLock.lock();
 +        try {
 +            if (isRunning()) {
 +                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
 +            }
 +            final long yieldMillis = 
FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS);
 +            if (yieldMillis < 0) {
 +                throw new IllegalArgumentException("Yield duration must be 
positive");
 +            }
 +            this.yieldPeriod.set(yieldPeriod);
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * Causes the processor not to be scheduled for some period of time. This 
duration can be obtained and set via the {@link #getYieldPeriod(TimeUnit)} and 
{@link #setYieldPeriod(long, TimeUnit)}
 +     * methods.
 +     */
 +    @Override
 +    public void yield() {
 +        final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS);
 +        yield(yieldMillis, TimeUnit.MILLISECONDS);
 +
 +        final String yieldDuration = (yieldMillis > 1000) ? (yieldMillis / 
1000) + " seconds" : yieldMillis + " milliseconds";
 +        LoggerFactory.getLogger(processor.getClass()).debug("{} has chosen to 
yield its resources; will not be scheduled to run again for {}", processor, 
yieldDuration);
 +    }
 +
 +    @Override
 +    public void yield(final long period, final TimeUnit timeUnit) {
 +        final long yieldMillis = TimeUnit.MILLISECONDS.convert(period, 
timeUnit);
 +        yieldExpiration.set(Math.max(yieldExpiration.get(), 
System.currentTimeMillis() + yieldMillis));
 +
 +        processScheduler.yield(this);
 +    }
 +
 +    /**
 +     * @return the number of milliseconds since Epoch at which time this 
processor is to once again be scheduled.
 +     */
 +    @Override
 +    public long getYieldExpiration() {
 +        return yieldExpiration.get();
 +    }
 +
 +    @Override
 +    public long getPenalizationPeriod(final TimeUnit timeUnit) {
 +        return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit 
== null ? DEFAULT_TIME_UNIT : timeUnit);
 +    }
 +
 +    @Override
 +    public String getPenalizationPeriod() {
 +        return penalizationPeriod.get();
 +    }
 +
 +    @Override
 +    public void setPenalizationPeriod(final String penalizationPeriod) {
 +        writeLock.lock();
 +        try {
 +            if (isRunning()) {
 +                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
 +            }
 +            final long penalizationMillis = 
FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod), 
TimeUnit.MILLISECONDS);
 +            if (penalizationMillis < 0) {
 +                throw new IllegalArgumentException("Penalization duration 
must be positive");
 +            }
 +            this.penalizationPeriod.set(penalizationPeriod);
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * Determines the number of concurrent tasks that may be running for this 
processor.
 +     *
 +     * @param taskCount a number of concurrent tasks this processor may have 
running
 +     * @throws IllegalArgumentException if the given value is less than 1
 +     */
 +    @Override
 +    public void setMaxConcurrentTasks(final int taskCount) {
 +        writeLock.lock();
 +        try {
 +            if (isRunning()) {
 +                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
 +            }
 +            if (taskCount < 1 && getSchedulingStrategy() != 
SchedulingStrategy.EVENT_DRIVEN) {
 +                throw new IllegalArgumentException();
 +            }
 +            if (!triggeredSerially) {
 +                concurrentTaskCount.set(taskCount);
 +            }
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public boolean isTriggeredSerially() {
 +        return triggeredSerially;
 +    }
 +
 +    /**
 +     * @return the number of tasks that may execute concurrently for this 
processor
 +     */
 +    @Override
 +    public int getMaxConcurrentTasks() {
 +        return concurrentTaskCount.get();
 +    }
 +
 +    @Override
 +    public LogLevel getBulletinLevel() {
 +        return 
LogRepositoryFactory.getRepository(getIdentifier()).getObservationLevel(BULLETIN_OBSERVER_ID);
 +    }
 +
 +    @Override
 +    public void setBulletinLevel(final LogLevel level) {
 +        
LogRepositoryFactory.getRepository(getIdentifier()).setObservationLevel(BULLETIN_OBSERVER_ID,
 level);
 +    }
 +
 +    @Override
 +    public Set<Connection> getConnections() {
 +        final Set<Connection> allConnections = new HashSet<>();
 +        readLock.lock();
 +        try {
 +            for (final Set<Connection> connectionSet : connections.values()) {
 +                allConnections.addAll(connectionSet);
 +            }
 +        } finally {
 +            readLock.unlock();
 +        }
 +
 +        return allConnections;
 +    }
 +
 +    @Override
 +    public List<Connection> getIncomingConnections() {
 +        return incomingConnectionsRef.get();
 +    }
 +
 +    @Override
 +    public Set<Connection> getConnections(final Relationship relationship) {
 +        final Set<Connection> applicableConnections;
 +        readLock.lock();
 +        try {
 +            applicableConnections = connections.get(relationship);
 +        } finally {
 +            readLock.unlock();
 +        }
 +        return (applicableConnections == null) ? Collections.<Connection> 
emptySet() : Collections.unmodifiableSet(applicableConnections);
 +    }
 +
 +    @Override
 +    public void addConnection(final Connection connection) {
 +        Objects.requireNonNull(connection, "connection cannot be null");
 +
 +        if (!connection.getSource().equals(this) && 
!connection.getDestination().equals(this)) {
 +            throw new IllegalStateException("Cannot a connection to a 
ProcessorNode for which the ProcessorNode is neither the Source nor the 
Destination");
 +        }
 +
 +        writeLock.lock();
 +        try {
 +            List<Connection> updatedIncoming = null;
 +            if (connection.getDestination().equals(this)) {
 +                // don't add the connection twice. This may occur if we have 
a self-loop because we will be told
 +                // to add the connection once because we are the source and 
again because we are the destination.
 +                final List<Connection> incomingConnections = 
incomingConnectionsRef.get();
 +                updatedIncoming = new ArrayList<>(incomingConnections);
 +                if (!updatedIncoming.contains(connection)) {
 +                    updatedIncoming.add(connection);
 +                }
 +            }
 +
 +            if (connection.getSource().equals(this)) {
 +                // don't add the connection twice. This may occur if we have 
a self-loop because we will be told
 +                // to add the connection once because we are the source and 
again because we are the destination.
 +                if (!destinations.containsKey(connection)) {
 +                    for (final Relationship relationship : 
connection.getRelationships()) {
 +                        final Relationship rel = 
getRelationship(relationship.getName());
 +                        Set<Connection> set = connections.get(rel);
 +                        if (set == null) {
 +                            set = new HashSet<>();
 +                            connections.put(rel, set);
 +                        }
 +
 +                        set.add(connection);
 +
 +                        destinations.put(connection, 
connection.getDestination());
 +                    }
 +
 +                    final Set<Relationship> autoTerminated = 
this.undefinedRelationshipsToTerminate.get();
 +                    if (autoTerminated != null) {
 +                        
autoTerminated.removeAll(connection.getRelationships());
 +                        
this.undefinedRelationshipsToTerminate.set(autoTerminated);
 +                    }
 +                }
 +            }
 +
 +            if (updatedIncoming != null) {
 +                
incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
 +            }
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public boolean hasIncomingConnection() {
 +        return !incomingConnectionsRef.get().isEmpty();
 +    }
 +
 +    @Override
 +    public void updateConnection(final Connection connection) throws 
IllegalStateException {
 +        if (requireNonNull(connection).getSource().equals(this)) {
 +            writeLock.lock();
 +            try {
 +                //
 +                // update any relationships
 +                //
 +                // first check if any relations were removed.
 +                final List<Relationship> existingRelationships = new 
ArrayList<>();
 +                for (final Map.Entry<Relationship, Set<Connection>> entry : 
connections.entrySet()) {
 +                    if (entry.getValue().contains(connection)) {
 +                        existingRelationships.add(entry.getKey());
 +                    }
 +                }
 +
 +                for (final Relationship rel : connection.getRelationships()) {
 +                    if (!existingRelationships.contains(rel)) {
 +                        // relationship was removed. Check if this is legal.
 +                        final Set<Connection> connectionsForRelationship = 
getConnections(rel);
 +                        if (connectionsForRelationship != null && 
connectionsForRelationship.size() == 1 && this.isRunning() && 
!isAutoTerminated(rel) && getRelationships().contains(rel)) {
 +                            // if we are running and we do not terminate 
undefined relationships and this is the only
 +                            // connection that defines the given 
relationship, and that relationship is required,
 +                            // then it is not legal to remove this 
relationship from this connection.
 +                            throw new IllegalStateException("Cannot remove 
relationship " + rel.getName() + " from Connection because doing so would 
invalidate Processor "
 +                                + this + ", which is currently running");
 +                        }
 +                    }
 +                }
 +
 +                // remove the connection from any list that currently contains
 +                for (final Set<Connection> list : connections.values()) {
 +                    list.remove(connection);
 +                }
 +
 +                // add the connection in for all relationships listed.
 +                for (final Relationship rel : connection.getRelationships()) {
 +                    Set<Connection> set = connections.get(rel);
 +                    if (set == null) {
 +                        set = new HashSet<>();
 +                        connections.put(rel, set);
 +                    }
 +                    set.add(connection);
 +                }
 +
 +                // update to the new destination
 +                destinations.put(connection, connection.getDestination());
 +
 +                final Set<Relationship> autoTerminated = 
this.undefinedRelationshipsToTerminate.get();
 +                if (autoTerminated != null) {
 +                    autoTerminated.removeAll(connection.getRelationships());
 +                    
this.undefinedRelationshipsToTerminate.set(autoTerminated);
 +                }
 +            } finally {
 +                writeLock.unlock();
 +            }
 +        }
 +
 +        if (connection.getDestination().equals(this)) {
 +            writeLock.lock();
 +            try {
 +                // update our incoming connections -- we can just remove & 
re-add the connection to
 +                // update the list.
 +                final List<Connection> incomingConnections = 
incomingConnectionsRef.get();
 +                final List<Connection> updatedIncoming = new 
ArrayList<>(incomingConnections);
 +                updatedIncoming.remove(connection);
 +                updatedIncoming.add(connection);
 +                
incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
 +            } finally {
 +                writeLock.unlock();
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void removeConnection(final Connection connection) {
 +        boolean connectionRemoved = false;
 +
 +        if (requireNonNull(connection).getSource().equals(this)) {
 +            for (final Relationship relationship : 
connection.getRelationships()) {
 +                final Set<Connection> connectionsForRelationship = 
getConnections(relationship);
 +                if ((connectionsForRelationship == null || 
connectionsForRelationship.size() <= 1) && isRunning()) {
 +                    throw new IllegalStateException("This connection cannot 
be removed because its source is running and removing it will invalidate this 
processor");
 +                }
 +            }
 +
 +            writeLock.lock();
 +            try {
 +                for (final Set<Connection> connectionList : 
this.connections.values()) {
 +                    connectionList.remove(connection);
 +                }
 +
 +                connectionRemoved = (destinations.remove(connection) != null);
 +            } finally {
 +                writeLock.unlock();
 +            }
 +        }
 +
 +        if (connection.getDestination().equals(this)) {
 +            writeLock.lock();
 +            try {
 +                final List<Connection> incomingConnections = 
incomingConnectionsRef.get();
 +                if (incomingConnections.contains(connection)) {
 +                    final List<Connection> updatedIncoming = new 
ArrayList<>(incomingConnections);
 +                    updatedIncoming.remove(connection);
 +                    
incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
 +                    return;
 +                }
 +            } finally {
 +                writeLock.unlock();
 +            }
 +        }
 +
 +        if (!connectionRemoved) {
 +            throw new IllegalArgumentException("Cannot remove a connection 
from a ProcessorNode for which the ProcessorNode is not the Source");
 +        }
 +    }
 +
 +    /**
 +     * @param relationshipName name
 +     * @return the relationship for this nodes processor for the given name 
or creates a new relationship for the given name
 +     */
 +    @Override
 +    public Relationship getRelationship(final String relationshipName) {
 +        final Relationship specRel = new 
Relationship.Builder().name(relationshipName).build();
 +        Relationship returnRel = specRel;
 +
 +        final Set<Relationship> relationships;
 +        try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
 +            relationships = processor.getRelationships();
 +        }
 +
 +        for (final Relationship rel : relationships) {
 +            if (rel.equals(specRel)) {
 +                returnRel = rel;
 +                break;
 +            }
 +        }
 +        return returnRel;
 +    }
 +
 +    @Override
 +    public Processor getProcessor() {
 +        return this.processor;
 +    }
 +
 +    /**
 +     * @return the Set of destination processors for all relationships 
excluding any destinations that are this processor itself (self-loops)
 +     */
 +    public Set<Connectable> getDestinations() {
 +        final Set<Connectable> nonSelfDestinations = new HashSet<>();
 +        readLock.lock();
 +        try {
 +            for (final Connectable connectable : destinations.values()) {
 +                if (connectable != this) {
 +                    nonSelfDestinations.add(connectable);
 +                }
 +            }
 +        } finally {
 +            readLock.unlock();
 +        }
 +        return nonSelfDestinations;
 +    }
 +
 +    public Set<Connectable> getDestinations(final Relationship relationship) {
 +        readLock.lock();
 +        try {
 +            final Set<Connectable> destinationSet = new HashSet<>();
 +            final Set<Connection> relationshipConnections = 
connections.get(relationship);
 +            if (relationshipConnections != null) {
 +                for (final Connection connection : relationshipConnections) {
 +                    destinationSet.add(destinations.get(connection));
 +                }
 +            }
 +            return destinationSet;
 +        } finally {
 +            readLock.unlock();
 +        }
 +    }
 +
 +    public Set<Relationship> getUndefinedRelationships() {
 +        final Set<Relationship> undefined = new HashSet<>();
 +        readLock.lock();
 +        try {
 +            final Set<Relationship> relationships;
 +            try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
 +                relationships = processor.getRelationships();
 +            }
 +
 +            if (relationships == null) {
 +                return undefined;
 +            }
 +            for (final Relationship relation : relationships) {
 +                final Set<Connection> connectionSet = 
this.connections.get(relation);
 +                if (connectionSet == null || connectionSet.isEmpty()) {
 +                    undefined.add(relation);
 +                }
 +            }
 +        } finally {
 +            readLock.unlock();
 +        }
 +        return undefined;
 +    }
 +
 +    /**
 +     * Determines if the given node is a destination for this node
 +     *
 +     * @param node node
 +     * @return true if is a direct destination node; false otherwise
 +     */
 +    boolean isRelated(final ProcessorNode node) {
 +        readLock.lock();
 +        try {
 +            return this.destinations.containsValue(node);
 +        } finally {
 +            readLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public boolean isRunning() {
 +        readLock.lock();
 +        try {
 +            return getScheduledState().equals(ScheduledState.RUNNING) || 
processScheduler.getActiveThreadCount(this) > 0;
 +        } finally {
 +            readLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public int getActiveThreadCount() {
 +        readLock.lock();
 +        try {
 +            return processScheduler.getActiveThreadCount(this);
 +        } finally {
 +            readLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public boolean isValid() {
 +        readLock.lock();
 +        try {
 +            final ValidationContext validationContext = 
validationContextFactory.newValidationContext(getProperties(), 
getAnnotationData());
 +
 +            final Collection<ValidationResult> validationResults;
 +            try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
 +                validationResults = 
getProcessor().validate(validationContext);
 +            }
 +
 +            for (final ValidationResult result : validationResults) {
 +                if (!result.isValid()) {
 +                    return false;
 +                }
 +            }
 +
 +            for (final Relationship undef : getUndefinedRelationships()) {
 +                if (!isAutoTerminated(undef)) {
 +                    return false;
 +                }
 +            }
 +
 +            switch (getInputRequirement()) {
 +                case INPUT_ALLOWED:
 +                    break;
 +                case INPUT_FORBIDDEN: {
 +                    if (!getIncomingConnections().isEmpty()) {
 +                        return false;
 +                    }
 +                    break;
 +                }
 +                case INPUT_REQUIRED: {
 +                    if (getIncomingConnections().isEmpty()) {
 +                        return false;
 +                    }
 +                    break;
 +                }
 +            }
 +        } catch (final Throwable t) {
 +            return false;
 +        } finally {
 +            readLock.unlock();
 +        }
 +
 +        return true;
 +    }
 +
 +    @Override
 +    public Collection<ValidationResult> getValidationErrors() {
 +        final List<ValidationResult> results = new ArrayList<>();
 +        readLock.lock();
 +        try {
 +            final ValidationContext validationContext = 
validationContextFactory.newValidationContext(getProperties(), 
getAnnotationData());
 +
 +            final Collection<ValidationResult> validationResults;
 +            try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
 +                validationResults = 
getProcessor().validate(validationContext);
 +            }
 +
 +            for (final ValidationResult result : validationResults) {
 +                if (!result.isValid()) {
 +                    results.add(result);
 +                }
 +            }
 +
 +            for (final Relationship relationship : 
getUndefinedRelationships()) {
 +                if (!isAutoTerminated(relationship)) {
 +                    final ValidationResult error = new 
ValidationResult.Builder()
 +                        .explanation("Relationship '" + 
relationship.getName() + "' is not connected to any component and is not 
auto-terminated")
 +                        .subject("Relationship " + relationship.getName())
 +                        .valid(false)
 +                        .build();
 +                    results.add(error);
 +                }
 +            }
 +
 +            switch (getInputRequirement()) {
 +                case INPUT_ALLOWED:
 +                    break;
 +                case INPUT_FORBIDDEN: {
 +                    final int incomingConnCount = 
getIncomingConnections().size();
 +                    if (incomingConnCount != 0) {
 +                        results.add(new ValidationResult.Builder()
-                             .explanation("Processor is currently configured 
with " + incomingConnCount + " upstream connections but does not accept any 
upstream connections")
-                             .subject("Upstream Connections")
++                            .explanation("Processor does not accept Incoming 
Connections but is currently configured with " + incomingConnCount + " Incoming 
Connections")
++                            .subject("Incoming Connections")
 +                            .valid(false)
 +                            .build());
 +                    }
 +                    break;
 +                }
 +                case INPUT_REQUIRED: {
 +                    if (getIncomingConnections().isEmpty()) {
 +                        results.add(new ValidationResult.Builder()
-                             .explanation("Processor requires an upstream 
connection but currently has none")
-                             .subject("Upstream Connections")
++                            .explanation("Processor required at least one 
Incoming Connection in order to perform its function but currently has no 
Incoming Connection")
++                            .subject("Incoming Connections")
 +                            .valid(false)
 +                            .build());
 +                    }
 +                    break;
 +                }
 +            }
 +        } catch (final Throwable t) {
 +            results.add(new ValidationResult.Builder().explanation("Failed to 
run validation due to " + t.toString()).valid(false).build());
 +        } finally {
 +            readLock.unlock();
 +        }
 +        return results;
 +    }
 +
 +    @Override
 +    public Requirement getInputRequirement() {
 +        return inputRequirement;
 +    }
 +
 +    /**
 +     * Establishes node equality (based on the processor's identifier)
 +     *
 +     * @param other node
 +     * @return true if equal
 +     */
 +    @Override
 +    public boolean equals(final Object other) {
 +        if (!(other instanceof ProcessorNode)) {
 +            return false;
 +        }
 +        final ProcessorNode on = (ProcessorNode) other;
 +        return new EqualsBuilder().append(identifier.get(), 
on.getIdentifier()).isEquals();
 +    }
 +
 +    @Override
 +    public int hashCode() {
 +        return new HashCodeBuilder(7, 67).append(identifier).toHashCode();
 +    }
 +
 +    @Override
 +    public Collection<Relationship> getRelationships() {
 +        try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
 +            return getProcessor().getRelationships();
 +        }
 +    }
 +
 +    @Override
 +    public String toString() {
 +        try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
 +            return getProcessor().toString();
 +        }
 +    }
 +
 +    @Override
 +    public ProcessGroup getProcessGroup() {
 +        return processGroup.get();
 +    }
 +
 +    @Override
 +    public void setProcessGroup(final ProcessGroup group) {
 +        writeLock.lock();
 +        try {
 +            this.processGroup.set(group);
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
 +        try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
 +            processor.onTrigger(context, sessionFactory);
 +        }
 +    }
 +
 +    @Override
 +    public ConnectableType getConnectableType() {
 +        return ConnectableType.PROCESSOR;
 +    }
 +
 +    @Override
 +    public void setScheduledState(final ScheduledState scheduledState) {
 +        this.scheduledState.set(scheduledState);
 +        if (!scheduledState.equals(ScheduledState.RUNNING)) { // if user 
stops processor, clear yield expiration
 +            yieldExpiration.set(0L);
 +        }
 +    }
 +
 +    @Override
 +    public void setAnnotationData(final String data) {
 +        writeLock.lock();
 +        try {
 +            if (isRunning()) {
 +                throw new IllegalStateException("Cannot set AnnotationData 
while processor is running");
 +            }
 +
 +            this.annotationData.set(data);
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public String getAnnotationData() {
 +        return annotationData.get();
 +    }
 +
 +    @Override
 +    public Collection<ValidationResult> validate(final ValidationContext 
validationContext) {
 +        return getValidationErrors();
 +    }
 +
 +    @Override
 +    public void verifyCanDelete() throws IllegalStateException {
 +        verifyCanDelete(false);
 +    }
 +
 +    @Override
 +    public void verifyCanDelete(final boolean ignoreConnections) {
 +        readLock.lock();
 +        try {
 +            if (isRunning()) {
 +                throw new IllegalStateException(this + " is running");
 +            }
 +
 +            if (!ignoreConnections) {
 +                for (final Set<Connection> connectionSet : 
connections.values()) {
 +                    for (final Connection connection : connectionSet) {
 +                        connection.verifyCanDelete();
 +                    }
 +                }
 +
 +                for (final Connection connection : 
incomingConnectionsRef.get()) {
 +                    if (connection.getSource().equals(this)) {
 +                        connection.verifyCanDelete();
 +                    } else {
 +                        throw new IllegalStateException(this + " is the 
destination of another component");
 +                    }
 +                }
 +            }
 +        } finally {
 +            readLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public void verifyCanStart() {
 +        readLock.lock();
 +        try {
 +            switch (getScheduledState()) {
 +                case DISABLED:
 +                    throw new IllegalStateException(this + " cannot be 
started because it is disabled");
 +                case RUNNING:
 +                    throw new IllegalStateException(this + " cannot be 
started because it is already running");
 +                case STOPPED:
 +                    break;
 +            }
 +            verifyNoActiveThreads();
 +
 +            if (!isValid()) {
 +                throw new IllegalStateException(this + " is not in a valid 
state");
 +            }
 +        } finally {
 +            readLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public void verifyCanStart(final Set<ControllerServiceNode> 
ignoredReferences) {
 +        switch (getScheduledState()) {
 +            case DISABLED:
 +                throw new IllegalStateException(this + " cannot be started 
because it is disabled");
 +            case RUNNING:
 +                throw new IllegalStateException(this + " cannot be started 
because it is already running");
 +            case STOPPED:
 +                break;
 +        }
 +        verifyNoActiveThreads();
 +
 +        final Set<String> ids = new HashSet<>();
 +        for (final ControllerServiceNode node : ignoredReferences) {
 +            ids.add(node.getIdentifier());
 +        }
 +
 +        final Collection<ValidationResult> validationResults = 
getValidationErrors(ids);
 +        for (final ValidationResult result : validationResults) {
 +            if (!result.isValid()) {
 +                throw new IllegalStateException(this + " cannot be started 
because it is not valid: " + result);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void verifyCanStop() {
 +        if (getScheduledState() != ScheduledState.RUNNING) {
 +            throw new IllegalStateException(this + " is not scheduled to 
run");
 +        }
 +    }
 +
 +    @Override
 +    public void verifyCanUpdate() {
 +        readLock.lock();
 +        try {
 +            if (isRunning()) {
 +                throw new IllegalStateException(this + " is not stopped");
 +            }
 +        } finally {
 +            readLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public void verifyCanEnable() {
 +        readLock.lock();
 +        try {
 +            if (getScheduledState() != ScheduledState.DISABLED) {
 +                throw new IllegalStateException(this + " is not disabled");
 +            }
 +
 +            verifyNoActiveThreads();
 +        } finally {
 +            readLock.unlock();
 +        }
 +    }
 +
 +    @Override
 +    public void verifyCanDisable() {
 +        readLock.lock();
 +        try {
 +            if (getScheduledState() != ScheduledState.STOPPED) {
 +               

<TRUNCATED>

Reply via email to