http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 0000000,071be4d..fe72ae4 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@@ -1,0 -1,1243 +1,1257 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.controller; + ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.behavior.TriggerSerially; ++import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable; ++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; + import org.apache.nifi.controller.ProcessScheduler; + import org.apache.nifi.controller.ValidationContextFactory; + import org.apache.nifi.controller.ProcessorNode; ++ + import static java.util.Objects.requireNonNull; + + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Objects; + import java.util.Set; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.concurrent.atomic.AtomicLong; + import java.util.concurrent.atomic.AtomicReference; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReentrantReadWriteLock; + + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.connectable.Connectable; + import org.apache.nifi.connectable.ConnectableType; + import org.apache.nifi.connectable.Connection; + import org.apache.nifi.connectable.Position; + import org.apache.nifi.controller.service.ControllerServiceProvider; + import org.apache.nifi.groups.ProcessGroup; + import org.apache.nifi.logging.LogLevel; + import org.apache.nifi.logging.LogRepositoryFactory; + import org.apache.nifi.nar.NarCloseable; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSessionFactory; + import org.apache.nifi.processor.Processor; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.TriggerSerially; -import org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable; -import org.apache.nifi.processor.annotation.TriggerWhenEmpty; + import org.apache.nifi.scheduling.SchedulingStrategy; + import org.apache.nifi.util.FormatUtils; - + import org.apache.commons.lang3.builder.EqualsBuilder; + import org.apache.commons.lang3.builder.HashCodeBuilder; + import org.quartz.CronExpression; + import org.slf4j.LoggerFactory; + + /** + * ProcessorNode provides thread-safe access to a FlowFileProcessor as it exists + * within a controlled flow. This node keeps track of the processor, its + * scheduling information and its relationships to other processors and whatever + * scheduled futures exist for it. Must be thread safe. + * + * @author none + */ + public class StandardProcessorNode extends ProcessorNode implements Connectable { + + public static final String BULLETIN_OBSERVER_ID = "bulletin-observer"; + + public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + public static final String DEFAULT_YIELD_PERIOD = "1 sec"; + public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec"; + private final AtomicReference<ProcessGroup> processGroup; + private final Processor processor; + private final AtomicReference<String> identifier; + private final Map<Connection, Connectable> destinations; + private final Map<Relationship, Set<Connection>> connections; + private final AtomicReference<Set<Relationship>> undefinedRelationshipsToTerminate; + private final AtomicReference<List<Connection>> incomingConnectionsRef; + private final ReentrantReadWriteLock rwLock; + private final Lock readLock; + private final Lock writeLock; + private final AtomicBoolean isolated; + private final AtomicBoolean lossTolerant; + private final AtomicReference<ScheduledState> scheduledState; + private final AtomicReference<String> comments; + private final AtomicReference<String> name; + private final AtomicReference<Position> position; + private final AtomicReference<String> annotationData; + private final AtomicReference<String> schedulingPeriod; // stored as string so it's presented to user as they entered it + private final AtomicReference<String> yieldPeriod; + private final AtomicReference<String> penalizationPeriod; + private final AtomicReference<Map<String, String>> style; + private final AtomicInteger concurrentTaskCount; + private final AtomicLong yieldExpiration; + private final AtomicLong schedulingNanos; + private final boolean triggerWhenEmpty; + private final boolean sideEffectFree; + private final boolean triggeredSerially; + private final boolean triggerWhenAnyDestinationAvailable; + private final boolean eventDrivenSupported; + private final boolean batchSupported; + private final ValidationContextFactory validationContextFactory; + private final ProcessScheduler processScheduler; + private long runNanos = 0L; + + private SchedulingStrategy schedulingStrategy; // guarded by read/write lock + ++ @SuppressWarnings("deprecation") + StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory, + final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider) { + super(processor, uuid, validationContextFactory, controllerServiceProvider); + + this.processor = processor; + identifier = new AtomicReference<>(uuid); + destinations = new HashMap<>(); + connections = new HashMap<>(); + incomingConnectionsRef = new AtomicReference<List<Connection>>(new ArrayList<Connection>()); + scheduledState = new AtomicReference<>(ScheduledState.STOPPED); + rwLock = new ReentrantReadWriteLock(false); + readLock = rwLock.readLock(); + writeLock = rwLock.writeLock(); + lossTolerant = new AtomicBoolean(false); + final Set<Relationship> emptySetOfRelationships = new HashSet<>(); + undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships); + comments = new AtomicReference<>(""); + name = new AtomicReference<>(processor.getClass().getSimpleName()); + schedulingPeriod = new AtomicReference<>("0 sec"); + schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS); + yieldPeriod = new AtomicReference<>(DEFAULT_YIELD_PERIOD); + yieldExpiration = new AtomicLong(0L); + concurrentTaskCount = new AtomicInteger(1); + position = new AtomicReference<>(new Position(0D, 0D)); + style = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<String, String>())); + this.processGroup = new AtomicReference<>(); + processScheduler = scheduler; + annotationData = new AtomicReference<>(); + isolated = new AtomicBoolean(false); + penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD); + - triggerWhenEmpty = processor.getClass().isAnnotationPresent(TriggerWhenEmpty.class); - sideEffectFree = processor.getClass().isAnnotationPresent(SideEffectFree.class); - batchSupported = processor.getClass().isAnnotationPresent(SupportsBatching.class); - triggeredSerially = processor.getClass().isAnnotationPresent(TriggerSerially.class); - triggerWhenAnyDestinationAvailable = processor.getClass().isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class); ++ final Class<?> procClass = processor.getClass(); ++ triggerWhenEmpty = procClass.isAnnotationPresent(TriggerWhenEmpty.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenEmpty.class); ++ sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SideEffectFree.class); ++ batchSupported = procClass.isAnnotationPresent(SupportsBatching.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SupportsBatching.class); ++ triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerSerially.class); ++ triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class); + this.validationContextFactory = validationContextFactory; - eventDrivenSupported = processor.getClass().isAnnotationPresent(EventDriven.class) && !triggeredSerially && !triggerWhenEmpty; ++ eventDrivenSupported = (procClass.isAnnotationPresent(EventDriven.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.EventDriven.class) )&& !triggeredSerially && !triggerWhenEmpty; + schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; + } + + /** + * @return comments about this specific processor instance + */ + @Override + public String getComments() { + return comments.get(); + } + + /** + * Provides and opportunity to retain information about this particular + * processor instance + * + * @param comments + */ + @Override + public void setComments(final String comments) { + writeLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + this.comments.set(comments); + } finally { + writeLock.unlock(); + } + } + + @Override + public ScheduledState getScheduledState() { + return scheduledState.get(); + } + + @Override + public Position getPosition() { + return position.get(); + } + + @Override + public void setPosition(Position position) { + this.position.set(position); + } + + public Map<String, String> getStyle() { + return style.get(); + } + + public void setStyle(final Map<String, String> style) { + if (style != null) { + this.style.set(Collections.unmodifiableMap(new HashMap<>(style))); + } + } + + @Override + public String getIdentifier() { + return identifier.get(); + } + + /** + * @return if true flow file content generated by this processor is + * considered loss tolerant + */ + @Override + public boolean isLossTolerant() { + return lossTolerant.get(); + } + + /** + * @return if true processor runs only on the primary node + */ + public boolean isIsolated() { + return isolated.get(); + } + + /** + * @return true if the processor has the {@link TriggerWhenEmpty} + * annotation, false otherwise. + */ + @Override + public boolean isTriggerWhenEmpty() { + return triggerWhenEmpty; + } + + /** + * @return true if the processor has the {@link SideEffectFree} annotation, + * false otherwise. + */ + public boolean isSideEffectFree() { + return sideEffectFree; + } + + @Override + public boolean isHighThroughputSupported() { + return batchSupported; + } + + /** + * @return true if the processor has the + * {@link TriggerWhenAnyDestinationAvailable} annotation, false otherwise. + */ + public boolean isTriggerWhenAnyDestinationAvailable() { + return triggerWhenAnyDestinationAvailable; + } + + /** + * Indicates whether flow file content made by this processor must be + * persisted + * + * @param lossTolerant + */ + @Override + public void setLossTolerant(final boolean lossTolerant) { + writeLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + this.lossTolerant.set(lossTolerant); + } finally { + writeLock.unlock(); + } + } + + /** + * Indicates whether the processor runs on only the primary node. + * + * @param isolated + */ + public void setIsolated(final boolean isolated) { + writeLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + this.isolated.set(isolated); + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean isAutoTerminated(final Relationship relationship) { + final Set<Relationship> terminatable = undefinedRelationshipsToTerminate.get(); + if (terminatable == null) { + return false; + } + return terminatable.contains(relationship); + } + + /** + * Indicates whether flow files transferred to undefined relationships + * should be terminated + * + * @param terminate + */ + @Override + public void setAutoTerminatedRelationships(final Set<Relationship> terminate) { + writeLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + + for (final Relationship rel : terminate) { + if (!getConnections(rel).isEmpty()) { + throw new IllegalStateException("Cannot mark relationship '" + rel.getName() + "' as auto-terminated because Connection already exists with this relationship"); + } + } + undefinedRelationshipsToTerminate.set(new HashSet<>(terminate)); + } finally { + writeLock.unlock(); + } + } + + /** + * @return an unmodifiable Set that contains all of the + * ProcessorRelationship objects that are configured to be auto-terminated + */ + public Set<Relationship> getAutoTerminatedRelationships() { + Set<Relationship> relationships = undefinedRelationshipsToTerminate.get(); + if (relationships == null) { + relationships = new HashSet<>(); + } + return Collections.unmodifiableSet(relationships); + } + + @Override + public String getName() { + return name.get(); + } + + /** + * @return the value of the processor's {@link CapabilityDescription} + * annotation, if one exists, else <code>null</code>. + */ ++ @SuppressWarnings("deprecation") + public String getProcessorDescription() { - final CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class); - return (capDesc == null) ? null : capDesc.value(); ++ CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class); ++ String description = null; ++ if ( capDesc != null ) { ++ description = capDesc.value(); ++ } else { ++ final org.apache.nifi.processor.annotation.CapabilityDescription deprecatedCapDesc = ++ processor.getClass().getAnnotation(org.apache.nifi.processor.annotation.CapabilityDescription.class); ++ if ( deprecatedCapDesc != null ) { ++ description = deprecatedCapDesc.value(); ++ } ++ } ++ ++ return description; + } + + @Override + public void setName(final String name) { + writeLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + this.name.set(name); + } finally { + writeLock.unlock(); + } + } + + /** + * @param timeUnit determines the unit of time to represent the scheduling + * period. If null will be reported in units of + * {@link #DEFAULT_SCHEDULING_TIME_UNIT} + * @return the schedule period that should elapse before subsequent cycles + * of this processor's tasks + */ + @Override + public long getSchedulingPeriod(final TimeUnit timeUnit) { + return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS); + } + + public boolean isEventDrivenSupported() { + readLock.lock(); + try { + return this.eventDrivenSupported; + } finally { + readLock.unlock(); + } + } + + /** + * Updates the Scheduling Strategy used for this Processor + * + * @param schedulingStrategy + * + * @throws IllegalArgumentException if the SchedulingStrategy is not not + * applicable for this Processor + */ + public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) { + writeLock.lock(); + try { + if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && !eventDrivenSupported) { + // not valid. Just ignore it. We don't throw an Exception because if a developer changes a Processor so that + // it no longer supports EventDriven mode, we don't want the app to fail to startup if it was already in Event-Driven + // Mode. Instead, we will simply leave it in Timer-Driven mode + return; + } + + this.schedulingStrategy = schedulingStrategy; + setIsolated(schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY); + } finally { + writeLock.unlock(); + } + } + + /** + * Returns the currently configured scheduling strategy + * + * @return + */ + public SchedulingStrategy getSchedulingStrategy() { + readLock.lock(); + try { + return this.schedulingStrategy; + } finally { + readLock.unlock(); + } + } + + @Override + public String getSchedulingPeriod() { + return schedulingPeriod.get(); + } + + /** + * @param value the number of <code>timeUnit</code>s between scheduling + * intervals. + * @param timeUnit determines the unit of time to represent the scheduling + * period. + */ + @Override + public void setScheduldingPeriod(final String schedulingPeriod) { + writeLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + + switch (schedulingStrategy) { + case CRON_DRIVEN: { + try { + new CronExpression(schedulingPeriod); + } catch (final Exception e) { + throw new IllegalArgumentException("Scheduling Period is not a valid cron expression: " + schedulingPeriod); + } + } + break; + case PRIMARY_NODE_ONLY: + case TIMER_DRIVEN: { + final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS); + if (schedulingNanos < 0) { + throw new IllegalArgumentException("Scheduling Period must be positive"); + } + this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos)); + } + break; + case EVENT_DRIVEN: + default: + return; + } + + this.schedulingPeriod.set(schedulingPeriod); + } finally { + writeLock.unlock(); + } + } + + @Override + public long getRunDuration(final TimeUnit timeUnit) { + readLock.lock(); + try { + return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS); + } finally { + readLock.unlock(); + } + } + + @Override + public void setRunDuration(final long duration, final TimeUnit timeUnit) { + writeLock.lock(); + try { + if (duration < 0) { + throw new IllegalArgumentException("Run Duration must be non-negative value; cannot set to " + timeUnit.toSeconds(duration) + " seconds"); + } + + this.runNanos = timeUnit.toNanos(duration); + } finally { + writeLock.unlock(); + } + } + + /** + * @param timeUnit determines the unit of time to represent the yield + * period. If null will be reported in units of + * {@link #DEFAULT_SCHEDULING_TIME_UNIT}. + * @return + */ + @Override + public long getYieldPeriod(final TimeUnit timeUnit) { + return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); + } + + public String getYieldPeriod() { + return yieldPeriod.get(); + } + + /** + * Updates the amount of time that this processor should avoid being + * scheduled when the processor calls + * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()} + * + * @param yieldPeriod + */ + @Override + public void setYieldPeriod(final String yieldPeriod) { + writeLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS); + if (yieldMillis < 0) { + throw new IllegalArgumentException("Yield duration must be positive"); + } + this.yieldPeriod.set(yieldPeriod); + } finally { + writeLock.unlock(); + } + } + + /** + * Causes the processor not to be scheduled for some period of time. This + * duration can be obtained and set via the + * {@link #getYieldPeriod(TimeUnit)} and + * {@link #setYieldPeriod(long, TimeUnit)} methods. + */ + @Override + public void yield() { + final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS); + yield(yieldMillis, TimeUnit.MILLISECONDS); + + final String yieldDuration = (yieldMillis > 1000) ? (yieldMillis / 1000) + " seconds" : yieldMillis + " milliseconds"; + LoggerFactory.getLogger(processor.getClass()).debug("{} has chosen to yield its resources; will not be scheduled to run again for {}", processor, yieldDuration); + } + + public void yield(final long period, final TimeUnit timeUnit) { + final long yieldMillis = TimeUnit.MILLISECONDS.convert(period, timeUnit); + yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis)); + + processScheduler.yield(this); + } + + /** + * @return the number of milliseconds since Epoch at which time this + * processor is to once again be scheduled. + */ + @Override + public long getYieldExpiration() { + return yieldExpiration.get(); + } + + @Override + public long getPenalizationPeriod(final TimeUnit timeUnit) { + return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); + } + + @Override + public String getPenalizationPeriod() { + return penalizationPeriod.get(); + } + + @Override + public void setPenalizationPeriod(final String penalizationPeriod) { + writeLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + final long penalizationMillis = FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod), TimeUnit.MILLISECONDS); + if (penalizationMillis < 0) { + throw new IllegalArgumentException("Penalization duration must be positive"); + } + this.penalizationPeriod.set(penalizationPeriod); + } finally { + writeLock.unlock(); + } + } + + /** + * Determines the number of concurrent tasks that may be running for this + * processor. + * + * @param taskCount a number of concurrent tasks this processor may have + * running + * @throws IllegalArgumentException if the given value is less than 1 + */ + @Override + public void setMaxConcurrentTasks(final int taskCount) { + writeLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + if (taskCount < 1 && getSchedulingStrategy() != SchedulingStrategy.EVENT_DRIVEN) { + throw new IllegalArgumentException(); + } + if (!triggeredSerially) { + concurrentTaskCount.set(taskCount); + } + } finally { + writeLock.unlock(); + } + } + + public boolean isTriggeredSerially() { + return triggeredSerially; + } + + /** + * @return the number of tasks that may execute concurrently for this + * processor + */ + @Override + public int getMaxConcurrentTasks() { + return concurrentTaskCount.get(); + } + + public LogLevel getBulletinLevel() { + return LogRepositoryFactory.getRepository(getIdentifier()).getObservationLevel(BULLETIN_OBSERVER_ID); + } + + public void setBulletinLevel(final LogLevel level) { + LogRepositoryFactory.getRepository(getIdentifier()).setObservationLevel(BULLETIN_OBSERVER_ID, level); + } + + @Override + public Set<Connection> getConnections() { + final Set<Connection> allConnections = new HashSet<>(); + readLock.lock(); + try { + for (final Set<Connection> connectionSet : connections.values()) { + allConnections.addAll(connectionSet); + } + } finally { + readLock.unlock(); + } + + return allConnections; + } + + @Override + public List<Connection> getIncomingConnections() { + return incomingConnectionsRef.get(); + } + + @Override + public Set<Connection> getConnections(final Relationship relationship) { + final Set<Connection> applicableConnections; + readLock.lock(); + try { + applicableConnections = connections.get(relationship); + } finally { + readLock.unlock(); + } + return (applicableConnections == null) ? Collections.<Connection>emptySet() : Collections.unmodifiableSet(applicableConnections); + } + + @Override + public void addConnection(final Connection connection) { + Objects.requireNonNull(connection, "connection cannot be null"); + + if (!connection.getSource().equals(this) && !connection.getDestination().equals(this)) { + throw new IllegalStateException("Cannot a connection to a ProcessorNode for which the ProcessorNode is neither the Source nor the Destination"); + } + + writeLock.lock(); + try { + List<Connection> updatedIncoming = null; + if (connection.getDestination().equals(this)) { + // don't add the connection twice. This may occur if we have a self-loop because we will be told + // to add the connection once because we are the source and again because we are the destination. + final List<Connection> incomingConnections = incomingConnectionsRef.get(); + updatedIncoming = new ArrayList<>(incomingConnections); + if (!updatedIncoming.contains(connection)) { + updatedIncoming.add(connection); + } + } + + if (connection.getSource().equals(this)) { + // don't add the connection twice. This may occur if we have a self-loop because we will be told + // to add the connection once because we are the source and again because we are the destination. + if (!destinations.containsKey(connection)) { + for (final Relationship relationship : connection.getRelationships()) { + final Relationship rel = getRelationship(relationship.getName()); + Set<Connection> set = connections.get(rel); + if (set == null) { + set = new HashSet<>(); + connections.put(rel, set); + } + + set.add(connection); + + destinations.put(connection, connection.getDestination()); + } + + final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); + if (autoTerminated != null) { + autoTerminated.removeAll(connection.getRelationships()); + this.undefinedRelationshipsToTerminate.set(autoTerminated); + } + } + } + + if (updatedIncoming != null) { + incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); + } + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean hasIncomingConnection() { + return !incomingConnectionsRef.get().isEmpty(); + } + + @Override + public void updateConnection(final Connection connection) throws IllegalStateException { + if (requireNonNull(connection).getSource().equals(this)) { + writeLock.lock(); + try { + // + // update any relationships + // + // first check if any relations were removed. + final List<Relationship> existingRelationships = new ArrayList<>(); + for (final Map.Entry<Relationship, Set<Connection>> entry : connections.entrySet()) { + if (entry.getValue().contains(connection)) { + existingRelationships.add(entry.getKey()); + } + } + + for (final Relationship rel : connection.getRelationships()) { + if (!existingRelationships.contains(rel)) { + // relationship was removed. Check if this is legal. + final Set<Connection> connectionsForRelationship = getConnections(rel); + if (connectionsForRelationship != null && connectionsForRelationship.size() == 1 && this.isRunning() && !isAutoTerminated(rel) && getRelationships().contains(rel)) { + // if we are running and we do not terminate undefined relationships and this is the only + // connection that defines the given relationship, and that relationship is required, + // then it is not legal to remove this relationship from this connection. + throw new IllegalStateException("Cannot remove relationship " + rel.getName() + " from Connection because doing so would invalidate Processor " + this + ", which is currently running"); + } + } + } + + // remove the connection from any list that currently contains + for (final Set<Connection> list : connections.values()) { + list.remove(connection); + } + + // add the connection in for all relationships listed. + for (final Relationship rel : connection.getRelationships()) { + Set<Connection> set = connections.get(rel); + if (set == null) { + set = new HashSet<>(); + connections.put(rel, set); + } + set.add(connection); + } + + // update to the new destination + destinations.put(connection, connection.getDestination()); + + final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); + if (autoTerminated != null) { + autoTerminated.removeAll(connection.getRelationships()); + this.undefinedRelationshipsToTerminate.set(autoTerminated); + } + } finally { + writeLock.unlock(); + } + } + + if (connection.getDestination().equals(this)) { + writeLock.lock(); + try { + // update our incoming connections -- we can just remove & re-add the connection to + // update the list. + final List<Connection> incomingConnections = incomingConnectionsRef.get(); + final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections); + updatedIncoming.remove(connection); + updatedIncoming.add(connection); + incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); + } finally { + writeLock.unlock(); + } + } + } + + @Override + public void removeConnection(final Connection connection) { + boolean connectionRemoved = false; + + if (requireNonNull(connection).getSource().equals(this)) { + for (final Relationship relationship : connection.getRelationships()) { + final Set<Connection> connectionsForRelationship = getConnections(relationship); + if ((connectionsForRelationship == null || connectionsForRelationship.size() <= 1) && isRunning()) { + throw new IllegalStateException("This connection cannot be removed because its source is running and removing it will invalidate this processor"); + } + } + + writeLock.lock(); + try { + for (final Set<Connection> connectionList : this.connections.values()) { + connectionList.remove(connection); + } + + connectionRemoved = (destinations.remove(connection) != null); + } finally { + writeLock.unlock(); + } + } + + if (connection.getDestination().equals(this)) { + writeLock.lock(); + try { + final List<Connection> incomingConnections = incomingConnectionsRef.get(); + if (incomingConnections.contains(connection)) { + final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections); + updatedIncoming.remove(connection); + incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); + return; + } + } finally { + writeLock.unlock(); + } + } + + if (!connectionRemoved) { + throw new IllegalArgumentException("Cannot remove a connection from a ProcessorNode for which the ProcessorNode is not the Source"); + } + } + + /** + * Gets the relationship for this nodes processor for the given name or + * creates a new relationship for the given name. + * + * @param relationshipName + * @return + */ + @Override + public Relationship getRelationship(final String relationshipName) { + final Relationship specRel = new Relationship.Builder().name(relationshipName).build(); + Relationship returnRel = specRel; + + final Set<Relationship> relationships; + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + relationships = processor.getRelationships(); + } + + for (final Relationship rel : relationships) { + if (rel.equals(specRel)) { + returnRel = rel; + break; + } + } + return returnRel; + } + + public Processor getProcessor() { + return this.processor; + } + + /** + * Obtains the Set of destination processors for all relationships excluding + * any destinations that are this processor itself (self-loops) + * + * @return + */ + public Set<Connectable> getDestinations() { + final Set<Connectable> nonSelfDestinations = new HashSet<>(); + readLock.lock(); + try { + for (final Connectable connectable : destinations.values()) { + if (connectable != this) { + nonSelfDestinations.add(connectable); + } + } + } finally { + readLock.unlock(); + } + return nonSelfDestinations; + } + + public Set<Connectable> getDestinations(final Relationship relationship) { + readLock.lock(); + try { + final Set<Connectable> destinationSet = new HashSet<>(); + final Set<Connection> relationshipConnections = connections.get(relationship); + if (relationshipConnections != null) { + for (final Connection connection : relationshipConnections) { + destinationSet.add(destinations.get(connection)); + } + } + return destinationSet; + } finally { + readLock.unlock(); + } + } + + public Set<Relationship> getUndefinedRelationships() { + final Set<Relationship> undefined = new HashSet<>(); + readLock.lock(); + try { + final Set<Relationship> relationships; + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + relationships = processor.getRelationships(); + } + + if (relationships == null) { + return undefined; + } + for (final Relationship relation : relationships) { + final Set<Connection> connectionSet = this.connections.get(relation); + if (connectionSet == null || connectionSet.isEmpty()) { + undefined.add(relation); + } + } + } finally { + readLock.unlock(); + } + return undefined; + } + + /** + * Determines if the given node is a destination for this node + * + * @param node + * @return true if is a direct destination node; false otherwise + */ + boolean isRelated(final ProcessorNode node) { + readLock.lock(); + try { + return this.destinations.containsValue(node); + } finally { + readLock.unlock(); + } + } + + @Override + public boolean isRunning() { + readLock.lock(); + try { + return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0; + } finally { + readLock.unlock(); + } + } + + @Override + public boolean isValid() { + readLock.lock(); + try { + final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData()); + + final Collection<ValidationResult> validationResults; + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + validationResults = getProcessor().validate(validationContext); + } + + for (final ValidationResult result : validationResults) { + if (!result.isValid()) { + return false; + } + } + + for (final Relationship undef : getUndefinedRelationships()) { + if (!isAutoTerminated(undef)) { + return false; + } + } + } catch (final Throwable t) { + return false; + } finally { + readLock.unlock(); + } + + return true; + } + + @Override + public Collection<ValidationResult> getValidationErrors() { + final List<ValidationResult> results = new ArrayList<>(); + readLock.lock(); + try { + final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData()); + + final Collection<ValidationResult> validationResults; + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + validationResults = getProcessor().validate(validationContext); + } + + for (final ValidationResult result : validationResults) { + if (!result.isValid()) { + results.add(result); + } + } + + for (final Relationship relationship : getUndefinedRelationships()) { + if (!isAutoTerminated(relationship)) { + final ValidationResult error = new ValidationResult.Builder() + .explanation("Relationship '" + relationship.getName() + "' is not connected to any component and is not auto-terminated") + .subject("Relationship " + relationship.getName()) + .valid(false) + .build(); + results.add(error); + } + } + } catch (final Throwable t) { + results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()).valid(false).build()); + } finally { + readLock.unlock(); + } + return results; + } + + /** + * Establishes node equality (based on the processor's identifier) + * + * @param other + * @return + */ + @Override + public boolean equals(final Object other) { + if (!(other instanceof ProcessorNode)) { + return false; + } + final ProcessorNode on = (ProcessorNode) other; + return new EqualsBuilder().append(identifier.get(), on.getIdentifier()).isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(7, 67).append(identifier).toHashCode(); + } + + @Override + public Collection<Relationship> getRelationships() { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + return getProcessor().getRelationships(); + } + } + + @Override + public String toString() { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + return getProcessor().toString(); + } + } + + @Override + public ProcessGroup getProcessGroup() { + return processGroup.get(); + } + + @Override + public void setProcessGroup(final ProcessGroup group) { + writeLock.lock(); + try { + this.processGroup.set(group); + } finally { + writeLock.unlock(); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + processor.onTrigger(context, sessionFactory); + } + } + + @Override + public ConnectableType getConnectableType() { + return ConnectableType.PROCESSOR; + } + + public void setScheduledState(final ScheduledState scheduledState) { + this.scheduledState.set(scheduledState); + if (!scheduledState.equals(ScheduledState.RUNNING)) { // if user stops processor, clear yield expiration + yieldExpiration.set(0L); + } + } + + @Override + public void setAnnotationData(final String data) { + writeLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException("Cannot set AnnotationData while processor is running"); + } + + this.annotationData.set(data); + } finally { + writeLock.unlock(); + } + } + + @Override + public String getAnnotationData() { + return annotationData.get(); + } + + @Override + public Collection<ValidationResult> validate(final ValidationContext validationContext) { + return processor.validate(validationContext); + } + + @Override + public void verifyCanDelete() throws IllegalStateException { + verifyCanDelete(false); + } + + @Override + public void verifyCanDelete(final boolean ignoreConnections) { + readLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException(this + " is running"); + } + + if (!ignoreConnections) { + for (final Set<Connection> connectionSet : connections.values()) { + for (final Connection connection : connectionSet) { + connection.verifyCanDelete(); + } + } + + for (final Connection connection : incomingConnectionsRef.get()) { + if (connection.getSource().equals(this)) { + connection.verifyCanDelete(); + } else { + throw new IllegalStateException(this + " is the destination of another component"); + } + } + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanStart() { + readLock.lock(); + try { + if (scheduledState.get() != ScheduledState.STOPPED) { + throw new IllegalStateException(this + " is not stopped"); + } + verifyNoActiveThreads(); + + if (!isValid()) { + throw new IllegalStateException(this + " is not in a valid state"); + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanStop() { + if (getScheduledState() != ScheduledState.RUNNING) { + throw new IllegalStateException(this + " is not scheduled to run"); + } + } + + @Override + public void verifyCanUpdate() { + readLock.lock(); + try { + if (isRunning()) { + throw new IllegalStateException(this + " is not stopped"); + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanEnable() { + readLock.lock(); + try { + if (getScheduledState() != ScheduledState.DISABLED) { + throw new IllegalStateException(this + " is not disabled"); + } + + verifyNoActiveThreads(); + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanDisable() { + readLock.lock(); + try { + if (getScheduledState() != ScheduledState.STOPPED) { + throw new IllegalStateException(this + " is not stopped"); + } + verifyNoActiveThreads(); + } finally { + readLock.unlock(); + } + } + + private void verifyNoActiveThreads() throws IllegalStateException { + final int threadCount = processScheduler.getActiveThreadCount(this); + if (threadCount > 0) { + throw new IllegalStateException(this + " has " + threadCount + " threads still active"); + } + } + + @Override + public void verifyModifiable() throws IllegalStateException { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); + } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index 0000000,6c27470..7c3734a mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@@ -1,0 -1,111 +1,210 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.controller.reporting; + + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicReference; + + import org.apache.nifi.controller.AbstractConfiguredComponent; + import org.apache.nifi.controller.Availability; + import org.apache.nifi.controller.ConfigurationContext; + import org.apache.nifi.controller.ControllerServiceLookup; + import org.apache.nifi.controller.ProcessScheduler; + import org.apache.nifi.controller.ReportingTaskNode; ++import org.apache.nifi.controller.ScheduledState; + import org.apache.nifi.controller.ValidationContextFactory; ++import org.apache.nifi.controller.annotation.OnConfigured; ++import org.apache.nifi.controller.exception.ProcessorLifeCycleException; + import org.apache.nifi.controller.service.ControllerServiceProvider; + import org.apache.nifi.controller.service.StandardConfigurationContext; ++import org.apache.nifi.nar.NarCloseable; + import org.apache.nifi.reporting.ReportingTask; + import org.apache.nifi.scheduling.SchedulingStrategy; + import org.apache.nifi.util.FormatUtils; ++import org.apache.nifi.util.ReflectionUtils; + + public abstract class AbstractReportingTaskNode extends AbstractConfiguredComponent implements ReportingTaskNode { + + private final ReportingTask reportingTask; + private final ProcessScheduler processScheduler; + private final ControllerServiceLookup serviceLookup; + + private final AtomicReference<SchedulingStrategy> schedulingStrategy = new AtomicReference<>(SchedulingStrategy.TIMER_DRIVEN); + private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("5 mins"); + private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY); + ++ private volatile ScheduledState scheduledState = ScheduledState.STOPPED; ++ + public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id, + final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, + final ValidationContextFactory validationContextFactory) { + super(reportingTask, id, validationContextFactory, controllerServiceProvider); + this.reportingTask = reportingTask; + this.processScheduler = processScheduler; + this.serviceLookup = controllerServiceProvider; + } + + @Override + public Availability getAvailability() { + return availability.get(); + } + + @Override + public void setAvailability(final Availability availability) { + this.availability.set(availability); + } + + @Override + public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) { + this.schedulingStrategy.set(schedulingStrategy); + } + + @Override + public SchedulingStrategy getSchedulingStrategy() { + return schedulingStrategy.get(); + } + + @Override + public String getSchedulingPeriod() { + return schedulingPeriod.get(); + } + + @Override + public long getSchedulingPeriod(final TimeUnit timeUnit) { + return FormatUtils.getTimeDuration(schedulingPeriod.get(), timeUnit); + } + + @Override + public void setScheduldingPeriod(final String schedulingPeriod) { + this.schedulingPeriod.set(schedulingPeriod); + } + + @Override + public ReportingTask getReportingTask() { + return reportingTask; + } + + @Override + public boolean isRunning() { + return processScheduler.isScheduled(this) || processScheduler.getActiveThreadCount(this) > 0; + } + + @Override + public ConfigurationContext getConfigurationContext() { + return new StandardConfigurationContext(this, serviceLookup); + } + + @Override + public void verifyModifiable() throws IllegalStateException { + if (isRunning()) { + throw new IllegalStateException("Cannot modify Reporting Task while the Reporting Task is running"); + } + } + ++ @Override ++ public ScheduledState getScheduledState() { ++ return scheduledState; ++ } ++ ++ @Override ++ public void setScheduledState(final ScheduledState state) { ++ this.scheduledState = state; ++ } ++ ++ @Override ++ public void setProperty(final String name, final String value) { ++ super.setProperty(name, value); ++ ++ onConfigured(); ++ } ++ ++ @Override ++ public boolean removeProperty(String name) { ++ final boolean removed = super.removeProperty(name); ++ if ( removed ) { ++ onConfigured(); ++ } ++ ++ return removed; ++ } ++ ++ private void onConfigured() { ++ // We need to invoke any method annotation with the OnConfigured annotation in order to ++ // maintain backward compatibility. This will be removed when we remove the old, deprecated annotations. ++ try (final NarCloseable x = NarCloseable.withNarLoader()) { ++ final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceLookup); ++ ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, configContext); ++ } catch (final Exception e) { ++ throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + reportingTask, e); ++ } ++ } ++ ++ public boolean isDisabled() { ++ return scheduledState == ScheduledState.DISABLED; ++ } ++ ++ @Override ++ public void verifyCanDelete() { ++ if (isRunning()) { ++ throw new IllegalStateException("Cannot delete " + reportingTask + " because it is currently running"); ++ } ++ } ++ ++ @Override ++ public void verifyCanDisable() { ++ if ( isRunning() ) { ++ throw new IllegalStateException("Cannot disable " + reportingTask + " because it is currently running"); ++ } ++ ++ if ( isDisabled() ) { ++ throw new IllegalStateException("Cannot disable " + reportingTask + " because it is already disabled"); ++ } ++ } ++ ++ ++ @Override ++ public void verifyCanEnable() { ++ if ( !isDisabled() ) { ++ throw new IllegalStateException("Cannot enable " + reportingTask + " because it is not disabled"); ++ } ++ } ++ ++ @Override ++ public void verifyCanStart() { ++ if ( isDisabled() ) { ++ throw new IllegalStateException("Cannot start " + reportingTask + " because it is currently disabled"); ++ } ++ ++ if ( isRunning() ) { ++ throw new IllegalStateException("Cannot start " + reportingTask + " because it is already running"); ++ } ++ } ++ ++ @Override ++ public void verifyCanStop() { ++ if ( !isRunning() ) { ++ throw new IllegalStateException("Cannot stop " + reportingTask + " because it is not running"); ++ } ++ } ++ ++ @Override ++ public void verifyCanUpdate() { ++ if ( isRunning() ) { ++ throw new IllegalStateException("Cannot update " + reportingTask + " because it is currently running"); ++ } ++ } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 0000000,af801bb..7455bf8 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@@ -1,0 -1,346 +1,346 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.controller.scheduling; + + import java.io.IOException; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.concurrent.atomic.AtomicLong; + ++import org.apache.nifi.annotation.lifecycle.OnStopped; + import org.apache.nifi.connectable.Connectable; + import org.apache.nifi.controller.EventBasedWorker; + import org.apache.nifi.controller.EventDrivenWorkerQueue; + import org.apache.nifi.controller.ProcessorNode; + import org.apache.nifi.controller.ReportingTaskNode; + import org.apache.nifi.controller.repository.BatchingSessionFactory; + import org.apache.nifi.controller.repository.ProcessContext; + import org.apache.nifi.controller.repository.StandardFlowFileEvent; + import org.apache.nifi.controller.repository.StandardProcessSession; + import org.apache.nifi.controller.repository.StandardProcessSessionFactory; + import org.apache.nifi.controller.service.ControllerServiceProvider; + import org.apache.nifi.encrypt.StringEncryptor; + import org.apache.nifi.engine.FlowEngine; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.nar.NarCloseable; + import org.apache.nifi.processor.ProcessSessionFactory; + import org.apache.nifi.processor.SimpleProcessLogger; + import org.apache.nifi.processor.StandardProcessContext; -import org.apache.nifi.processor.annotation.OnStopped; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.util.Connectables; + import org.apache.nifi.util.FormatUtils; + import org.apache.nifi.util.ReflectionUtils; - + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class EventDrivenSchedulingAgent implements SchedulingAgent { + + private static final Logger logger = LoggerFactory.getLogger(EventDrivenSchedulingAgent.class); + + private final FlowEngine flowEngine; + private final ControllerServiceProvider controllerServiceProvider; + private final EventDrivenWorkerQueue workerQueue; + private final ProcessContextFactory contextFactory; + private final AtomicInteger maxThreadCount; + private final StringEncryptor encryptor; + + private volatile String adminYieldDuration = "1 sec"; + + private final ConcurrentMap<Connectable, AtomicLong> connectionIndexMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<Connectable, ScheduleState> scheduleStates = new ConcurrentHashMap<>(); + + public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider flowController, final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) { + this.flowEngine = flowEngine; + this.controllerServiceProvider = flowController; + this.workerQueue = workerQueue; + this.contextFactory = contextFactory; + this.maxThreadCount = new AtomicInteger(maxThreadCount); + this.encryptor = encryptor; + + for (int i = 0; i < maxThreadCount; i++) { + final Runnable eventDrivenTask = new EventDrivenTask(workerQueue); + flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 30000, TimeUnit.NANOSECONDS); + } + } + + @Override + public void shutdown() { + flowEngine.shutdown(); + } + + @Override + public void schedule(final ReportingTaskNode taskNode, ScheduleState scheduleState) { + throw new UnsupportedOperationException("ReportingTasks cannot be scheduled in Event-Driven Mode"); + } + + @Override + public void unschedule(ReportingTaskNode taskNode, ScheduleState scheduleState) { + throw new UnsupportedOperationException("ReportingTasks cannot be scheduled in Event-Driven Mode"); + } + + @Override + public void schedule(final Connectable connectable, final ScheduleState scheduleState) { + workerQueue.resumeWork(connectable); + logger.info("Scheduled {} to run in Event-Driven mode", connectable); + scheduleStates.put(connectable, scheduleState); + } + + @Override + public void unschedule(final Connectable connectable, final ScheduleState scheduleState) { + workerQueue.suspendWork(connectable); + logger.info("Stopped scheduling {} to run", connectable); + } + + @Override + public void onEvent(final Connectable connectable) { + workerQueue.offer(connectable); + } + + @Override + public void setMaxThreadCount(final int maxThreadCount) { + final int oldMax = this.maxThreadCount.getAndSet(maxThreadCount); + if (maxThreadCount > oldMax) { + // if more threads have been allocated, add more tasks to the work queue + final int tasksToAdd = maxThreadCount - oldMax; + for (int i = 0; i < tasksToAdd; i++) { + final Runnable eventDrivenTask = new EventDrivenTask(workerQueue); + flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 30000, TimeUnit.NANOSECONDS); + } + } + } + + @Override + public void setAdministrativeYieldDuration(final String yieldDuration) { + this.adminYieldDuration = yieldDuration; + } + + @Override + public String getAdministrativeYieldDuration() { + return adminYieldDuration; + } + + @Override + public long getAdministrativeYieldDuration(final TimeUnit timeUnit) { + return FormatUtils.getTimeDuration(adminYieldDuration, timeUnit); + } + + private class EventDrivenTask implements Runnable { + + private final EventDrivenWorkerQueue workerQueue; + + public EventDrivenTask(final EventDrivenWorkerQueue workerQueue) { + this.workerQueue = workerQueue; + } + + @Override + public void run() { + while (!flowEngine.isShutdown()) { + final EventBasedWorker worker = workerQueue.poll(1, TimeUnit.SECONDS); + if (worker == null) { + continue; + } + final Connectable connectable = worker.getConnectable(); + final ScheduleState scheduleState = scheduleStates.get(connectable); + if (scheduleState == null) { + // Component not yet scheduled to run but has received events + continue; + } + + // get the connection index for this worker + AtomicLong connectionIndex = connectionIndexMap.get(connectable); + if (connectionIndex == null) { + connectionIndex = new AtomicLong(0L); + final AtomicLong existingConnectionIndex = connectionIndexMap.putIfAbsent(connectable, connectionIndex); + if (existingConnectionIndex != null) { + connectionIndex = existingConnectionIndex; + } + } + + final ProcessContext context = contextFactory.newProcessContext(connectable, connectionIndex); + + if (connectable instanceof ProcessorNode) { + final ProcessorNode procNode = (ProcessorNode) connectable; + final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor); + + final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS); + final ProcessSessionFactory sessionFactory; + final StandardProcessSession rawSession; + final boolean batch; + if (procNode.isHighThroughputSupported() && runNanos > 0L) { + rawSession = new StandardProcessSession(context); + sessionFactory = new BatchingSessionFactory(rawSession); + batch = true; + } else { + rawSession = null; + sessionFactory = new StandardProcessSessionFactory(context); + batch = false; + } + + final long startNanos = System.nanoTime(); + final long finishNanos = startNanos + runNanos; + int invocationCount = 0; + boolean shouldRun = true; + + try { + while (shouldRun) { + trigger(procNode, context, scheduleState, standardProcessContext, sessionFactory); + invocationCount++; + + if (!batch) { + break; + } + if (System.nanoTime() > finishNanos) { + break; + } + if (!scheduleState.isScheduled()) { + break; + } + + final int eventCount = worker.decrementEventCount(); + if (eventCount < 0) { + worker.incrementEventCount(); + } + shouldRun = (eventCount > 0); + } + } finally { + if (batch && rawSession != null) { + try { + rawSession.commit(); + } catch (final RuntimeException re) { + logger.error("Unable to commit process session", re); + } + } + try { + final long processingNanos = System.nanoTime() - startNanos; + final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier()); + procEvent.setProcessingNanos(processingNanos); + procEvent.setInvocations(invocationCount); + context.getFlowFileEventRepository().updateRepository(procEvent); + } catch (final IOException e) { + logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e.toString()); + logger.error("", e); + } + } + + // If the Processor has FlowFiles, go ahead and register it to run again. + // We do this because it's possible (and fairly common) for a Processor to be triggered and then determine, + // for whatever reason, that it is not ready to do anything and as a result simply returns without pulling anything + // off of its input queue. + // In this case, we could just say that the Processor shouldn't be Event-Driven, but then it becomes very complex and + // confusing to determine whether or not a Processor is really Event-Driven. So, the solution that we will use at this + // point is to register the Processor to run again. + if (Connectables.flowFilesQueued(procNode)) { + onEvent(procNode); + } + } else { + final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context); + final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor); + trigger(connectable, scheduleState, connectableProcessContext, sessionFactory); + + // See explanation above for the ProcessorNode as to why we do this. + if (Connectables.flowFilesQueued(connectable)) { + onEvent(connectable); + } + } + } + } + ++ @SuppressWarnings("deprecation") + private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) { + final int newThreadCount = scheduleState.incrementActiveThreadCount(); + if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) { + // its possible that the worker queue could give us a worker node that is eligible to run based + // on the number of threads but another thread has already incremented the thread count, result in + // reaching the maximum number of threads. we won't know this until we atomically increment the thread count + // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would + // result in using more than the maximum number of defined threads + scheduleState.decrementActiveThreadCount(); + return; + } + + try { + try (final AutoCloseable ncl = NarCloseable.withNarLoader()) { + worker.onTrigger(processContext, sessionFactory); + } catch (final ProcessException pe) { + logger.error("{} failed to process session due to {}", worker, pe.toString()); + } catch (final Throwable t) { + logger.error("{} failed to process session due to {}", worker, t.toString()); + logger.error("", t); + + logger.warn("{} Administratively Pausing for {} due to processing failure: {}", worker, getAdministrativeYieldDuration(), t.toString()); + logger.warn("", t); + try { + Thread.sleep(FormatUtils.getTimeDuration(adminYieldDuration, TimeUnit.MILLISECONDS)); + } catch (final InterruptedException e) { + } + + } + } finally { + if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { + try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, worker, processContext); ++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, worker, processContext); + } + } + + scheduleState.decrementActiveThreadCount(); + } + } + + private void trigger(final ProcessorNode worker, final ProcessContext context, final ScheduleState scheduleState, final StandardProcessContext processContext, final ProcessSessionFactory sessionFactory) { + final int newThreadCount = scheduleState.incrementActiveThreadCount(); + if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) { + // its possible that the worker queue could give us a worker node that is eligible to run based + // on the number of threads but another thread has already incremented the thread count, result in + // reaching the maximum number of threads. we won't know this until we atomically increment the thread count + // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would + // result in using more than the maximum number of defined threads + scheduleState.decrementActiveThreadCount(); + return; + } + + try { + try (final AutoCloseable ncl = NarCloseable.withNarLoader()) { + worker.onTrigger(processContext, sessionFactory); + } catch (final ProcessException pe) { + final ProcessorLog procLog = new SimpleProcessLogger(worker.getIdentifier(), worker.getProcessor()); + procLog.error("Failed to process session due to {}", new Object[]{pe}); + } catch (final Throwable t) { + // Use ProcessorLog to log the event so that a bulletin will be created for this processor + final ProcessorLog procLog = new SimpleProcessLogger(worker.getIdentifier(), worker.getProcessor()); + procLog.error("{} failed to process session due to {}", new Object[]{worker.getProcessor(), t}); + procLog.warn("Processor Administratively Yielded for {} due to processing failure", new Object[]{adminYieldDuration}); + logger.warn("Administratively Yielding {} due to uncaught Exception: ", worker.getProcessor()); + logger.warn("", t); + + worker.yield(FormatUtils.getTimeDuration(adminYieldDuration, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + } + } finally { + // if the processor is no longer scheduled to run and this is the last thread, + // invoke the OnStopped methods + if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { + try (final NarCloseable x = NarCloseable.withNarLoader()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, worker.getProcessor(), processContext); + } + } + + scheduleState.decrementActiveThreadCount(); + } + } + } + }
