http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java index 8deda3c..65756f4 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java @@ -103,7 +103,7 @@ public class VolatileContentRepository implements ContentRepository { private final ConcurrentMap<ContentClaim, ContentClaim> backupRepoClaimMap = new ConcurrentHashMap<>(256); private final AtomicReference<ContentRepository> backupRepositoryRef = new AtomicReference<>(null); - private ContentClaimManager claimManager; // effectively final + private ContentClaimManager claimManager; // effectively final public VolatileContentRepository() { this(NiFiProperties.getInstance()); @@ -137,7 +137,7 @@ public class VolatileContentRepository implements ContentRepository { public void initialize(final ContentClaimManager claimManager) { this.claimManager = claimManager; } - + @Override public void shutdown() { executor.shutdown(); @@ -147,7 +147,7 @@ public class VolatileContentRepository implements ContentRepository { * Specifies a Backup Repository where data should be written if this * Repository fills up * - * @param backup + * @param backup repo backup */ public void setBackupRepository(final ContentRepository backup) { final boolean updated = backupRepositoryRef.compareAndSet(null, backup);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java index a7020a6..9e429d6 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java @@ -34,7 +34,7 @@ import org.apache.nifi.controller.repository.claim.ContentClaimManager; public class VolatileFlowFileRepository implements FlowFileRepository { private final AtomicLong idGenerator = new AtomicLong(0L); - private ContentClaimManager claimManager; // effectively final + private ContentClaimManager claimManager; // effectively final @Override public void initialize(final ContentClaimManager claimManager) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 292c258..0779c4d 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -102,7 +102,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // synced with disk. // // This is required due to the following scenario, which could exist if we did not do this: - // + // // A Processor modifies a FlowFile (whose content is in ContentClaim A), writing the new content to ContentClaim B. // The processor removes ContentClaim A, which deletes the backing file. // The FlowFile Repository writes out this change but has not yet synced the update to disk. @@ -112,12 +112,12 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // ContentClaim A does not exist anymore because the Session Commit destroyed the data. // This results in Data Loss! // However, the comment in the class's JavaDocs regarding sync'ing should also be considered. - // + // // In order to avoid this, instead of destroying ContentClaim A, the ProcessSession puts the claim on the Claim Destruction Queue. // We periodically force a sync of the FlowFile Repository to the backing storage mechanism. // We can then destroy the data. If we end up syncing the FlowFile Repository to the backing storage mechanism and then restart // before the data is destroyed, it's okay because the data will be unknown to the Content Repository, so it will be destroyed - // on restart. + // on restart. private final ConcurrentMap<Integer, BlockingQueue<ContentClaim>> claimsAwaitingDestruction = new ConcurrentHashMap<>(); public WriteAheadFlowFileRepository() { @@ -129,7 +129,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis flowFileRepositoryPath = properties.getFlowFileRepositoryPath(); numPartitions = properties.getFlowFileRepositoryPartitions(); checkpointDelayMillis = FormatUtils.getTimeDuration(properties.getFlowFileRepositoryCheckpointInterval(), TimeUnit.MILLISECONDS); - + checkpointExecutor = Executors.newSingleThreadScheduledExecutor(); } @@ -267,9 +267,9 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis * the specified Swap File and returns the number of FlowFiles that were * persisted. * - * @param queue - * @param swapLocation - * @throws IOException + * @param queue queue to swap out + * @param swapLocation location to swap to + * @throws IOException ioe */ @Override public void swapFlowFilesOut(final List<FlowFileRecord> swappedOut, final FlowFileQueue queue, final String swapLocation) throws IOException { @@ -289,14 +289,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{swappedOut.size(), queue, swapLocation}); } - /** - * Swaps FlowFiles into memory space from the given Swap File - * - * @param swapLocation - * @param swapRecords - * @param queue - * @throws IOException - */ @Override public void swapFlowFilesIn(final String swapLocation, final List<FlowFileRecord> swapRecords, final FlowFileQueue queue) throws IOException { final List<RepositoryRecord> repoRecords = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java index c43f3fe..54a1b2c 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java @@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger; * <p> * Must be thread safe</p> * - * @author none */ public final class StandardContentClaim implements ContentClaim, Comparable<ContentClaim> { @@ -38,14 +37,6 @@ public final class StandardContentClaim implements ContentClaim, Comparable<Cont private final AtomicInteger claimantCount = new AtomicInteger(0); private final int hashCode; - /** - * Constructs a content claim - * - * @param container - * @param section - * @param id - * @param lossTolerant - */ StandardContentClaim(final String container, final String section, final String id, final boolean lossTolerant) { this.container = container.intern(); this.section = section.intern(); @@ -100,7 +91,7 @@ public final class StandardContentClaim implements ContentClaim, Comparable<Cont * Provides the natural ordering for ContentClaim objects. By default they * are sorted by their id, then container, then section * - * @param other + * @param other other claim * @return x such that x <=1 if this is less than other; * x=0 if this.equals(other); * x >= 1 if this is greater than other http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java index 43bbb5a..b68f95e 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java @@ -34,16 +34,6 @@ public class StandardContentClaimManager implements ContentClaimManager { private static final BlockingQueue<ContentClaim> destructableClaims = new LinkedBlockingQueue<>(50000); - /** - * Creates a new Content Claim with the given id, container, section, and - * loss tolerance. - * - * @param id - * @param container - * @param section - * @param lossTolerant - * @return - */ @Override public ContentClaim newContentClaim(final String container, final String section, final String id, final boolean lossTolerant) { return new StandardContentClaim(container, section, id, lossTolerant); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java index f7da136..4e727e9 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java @@ -40,9 +40,7 @@ public class ByteCountingOutputStream extends OutputStream { write(b, 0, b.length); } - ; - - @Override + @Override public void write(byte[] b, int off, int len) throws IOException { out.write(b, off, len); bytesWrittenHolder.increment(len); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/FlowFileAccessInputStream.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/FlowFileAccessInputStream.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/FlowFileAccessInputStream.java index f349887..a710070 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/FlowFileAccessInputStream.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/FlowFileAccessInputStream.java @@ -56,10 +56,8 @@ public class FlowFileAccessInputStream extends FilterInputStream { } /** - * Returns the ContentNotFoundException that was thrown by this stream, or - * <code>null</code> if no such Exception was thrown. - * - * @return + * @return the ContentNotFoundException that was thrown by this stream, or + * <code>null</code> if no such Exception was thrown */ public ContentNotFoundException getContentNotFoundException() { return thrown; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java index acb3a01..01285b0 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java @@ -179,14 +179,14 @@ public class ConnectableProcessContext implements ProcessContext { @Override public Set<Relationship> getAvailableRelationships() { - for ( final Connection connection : connectable.getConnections() ) { - if ( connection.getFlowFileQueue().isFull() ) { + for (final Connection connection : connectable.getConnections()) { + if (connection.getFlowFileQueue().isFull()) { return Collections.emptySet(); } } - + final Collection<Relationship> relationships = connectable.getRelationships(); - if ( relationships instanceof Set ) { + if (relationships instanceof Set) { return (Set<Relationship>) relationships; } return new HashSet<>(connectable.getRelationships()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 7455bf8..77ae686 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -65,7 +65,8 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent { private final ConcurrentMap<Connectable, AtomicLong> connectionIndexMap = new ConcurrentHashMap<>(); private final ConcurrentMap<Connectable, ScheduleState> scheduleStates = new ConcurrentHashMap<>(); - public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider flowController, final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) { + public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider flowController, + final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) { this.flowEngine = flowEngine; this.controllerServiceProvider = flowController; this.workerQueue = workerQueue; @@ -265,15 +266,15 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent { private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) { final int newThreadCount = scheduleState.incrementActiveThreadCount(); if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) { - // its possible that the worker queue could give us a worker node that is eligible to run based - // on the number of threads but another thread has already incremented the thread count, result in - // reaching the maximum number of threads. we won't know this until we atomically increment the thread count - // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would - // result in using more than the maximum number of defined threads - scheduleState.decrementActiveThreadCount(); - return; + // its possible that the worker queue could give us a worker node that is eligible to run based + // on the number of threads but another thread has already incremented the thread count, result in + // reaching the maximum number of threads. we won't know this until we atomically increment the thread count + // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would + // result in using more than the maximum number of defined threads + scheduleState.decrementActiveThreadCount(); + return; } - + try { try (final AutoCloseable ncl = NarCloseable.withNarLoader()) { worker.onTrigger(processContext, sessionFactory); @@ -302,18 +303,19 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent { } } - private void trigger(final ProcessorNode worker, final ProcessContext context, final ScheduleState scheduleState, final StandardProcessContext processContext, final ProcessSessionFactory sessionFactory) { + private void trigger(final ProcessorNode worker, final ProcessContext context, final ScheduleState scheduleState, + final StandardProcessContext processContext, final ProcessSessionFactory sessionFactory) { final int newThreadCount = scheduleState.incrementActiveThreadCount(); if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) { - // its possible that the worker queue could give us a worker node that is eligible to run based - // on the number of threads but another thread has already incremented the thread count, result in - // reaching the maximum number of threads. we won't know this until we atomically increment the thread count - // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would - // result in using more than the maximum number of defined threads - scheduleState.decrementActiveThreadCount(); - return; + // its possible that the worker queue could give us a worker node that is eligible to run based + // on the number of threads but another thread has already incremented the thread count, result in + // reaching the maximum number of threads. we won't know this until we atomically increment the thread count + // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would + // result in using more than the maximum number of defined threads + scheduleState.decrementActiveThreadCount(); + return; } - + try { try (final AutoCloseable ncl = NarCloseable.withNarLoader()) { worker.onTrigger(processContext, sessionFactory); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java index 3355e73..4278cee 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java @@ -135,7 +135,7 @@ public class QuartzSchedulingAgent implements SchedulingAgent { final Callable<Boolean> continuallyRunTask; if (connectable.getConnectableType() == ConnectableType.PROCESSOR) { final ProcessorNode procNode = (ProcessorNode) connectable; - + final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor); ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext); continuallyRunTask = runnableTask; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java index ff17912..cb7f55f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java @@ -28,7 +28,7 @@ public class ScheduleState { private final AtomicInteger activeThreadCount = new AtomicInteger(0); private final AtomicBoolean scheduled = new AtomicBoolean(false); - private final Set<ScheduledFuture<?>> futures = new HashSet<ScheduledFuture<?>>(); + private final Set<ScheduledFuture<?>> futures = new HashSet<>(); private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false); private volatile long lastStopTime = -1; @@ -78,7 +78,7 @@ public class ScheduleState { * Establishes the list of relevant futures for this processor. Replaces any * previously held futures. * - * @param newFutures + * @param newFutures futures */ public synchronized void setFutures(final Collection<ScheduledFuture<?>> newFutures) { futures.clear(); @@ -89,7 +89,7 @@ public class ScheduleState { futures.remove(oldFuture); futures.add(newFuture); } - + public synchronized Set<ScheduledFuture<?>> getFutures() { return Collections.unmodifiableSet(futures); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 7725823..bb565cb 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -147,12 +147,11 @@ public final class StandardProcessScheduler implements ProcessScheduler { LOG.error("", t); } } - + frameworkTaskExecutor.shutdown(); componentLifeCycleThreadPool.shutdown(); } - @Override public void schedule(final ReportingTaskNode taskNode) { final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode)); @@ -184,13 +183,13 @@ public final class StandardProcessScheduler implements ProcessScheduler { try (final NarCloseable x = NarCloseable.withNarLoader()) { ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, OnScheduled.class, reportingTask, taskNode.getConfigurationContext()); } - + break; } catch (final Exception e) { final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e; final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask); componentLog.error("Failed to invoke @OnEnabled method due to {}", cause); - + LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e); try { @@ -208,20 +207,19 @@ public final class StandardProcessScheduler implements ProcessScheduler { taskNode.setScheduledState(ScheduledState.RUNNING); } - @Override public void unschedule(final ReportingTaskNode taskNode) { final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode)); if (!scheduleState.isScheduled()) { return; } - + taskNode.verifyCanStop(); final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy()); final ReportingTask reportingTask = taskNode.getReportingTask(); scheduleState.setScheduled(false); taskNode.setScheduledState(ScheduledState.STOPPED); - + final Runnable unscheduleReportingTaskRunnable = new Runnable() { @SuppressWarnings("deprecation") @Override @@ -240,7 +238,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", reportingTask, cause.toString(), administrativeYieldDuration); LOG.error("", cause); - + try { Thread.sleep(administrativeYieldMillis); } catch (final InterruptedException ie) { @@ -293,32 +291,32 @@ public final class StandardProcessScheduler implements ProcessScheduler { final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor); final Set<String> serviceIds = new HashSet<>(); - for ( final PropertyDescriptor descriptor : processContext.getProperties().keySet() ) { + for (final PropertyDescriptor descriptor : processContext.getProperties().keySet()) { final Class<? extends ControllerService> serviceDefinition = descriptor.getControllerServiceDefinition(); - if ( serviceDefinition != null ) { + if (serviceDefinition != null) { final String serviceId = processContext.getProperty(descriptor).getValue(); - if ( serviceId != null ) { - serviceIds.add(serviceId); + if (serviceId != null) { + serviceIds.add(serviceId); } } } - - attemptOnScheduled: while (true) { + + attemptOnScheduled: + while (true) { try { synchronized (scheduleState) { - for ( final String serviceId : serviceIds ) { + for (final String serviceId : serviceIds) { final boolean enabled = processContext.isControllerServiceEnabled(serviceId); - if ( !enabled ) { + if (!enabled) { LOG.debug("Controller Service with ID {} is not yet enabled, so will not start {} yet", serviceId, procNode); Thread.sleep(administrativeYieldMillis); continue attemptOnScheduled; } } - + // if no longer scheduled to run, then we're finished. This can happen, for example, - // if the @OnScheduled method throws an Exception and the user stops the processor + // if the @OnScheduled method throws an Exception and the user stops the processor // while we're administratively yielded. - // // we also check if the schedule state's last start time is equal to what it was before. // if not, then means that the processor has been stopped and started again, so we should just // bail; another thread will be responsible for invoking the @OnScheduled methods. @@ -363,12 +361,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { componentLifeCycleThreadPool.execute(startProcRunnable); } - /** - * Used to delay scheduling the given Processor to run until its yield - * duration expires. - * - * @param procNode - */ @Override public void yield(final ProcessorNode procNode) { // This exists in the ProcessScheduler so that the scheduler can take advantage of the fact that @@ -381,7 +373,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { // the context. If this Processor has X number of threads, we end up submitting X new tasks while the previous // X-1 tasks are still running. At this point, another thread could finish and do the same thing, resulting in // an additional X-1 extra tasks being submitted. - // + // // As a result, we simply removed this buggy implementation, as it was a very minor performance optimization // that gave very bad results. } @@ -431,24 +423,11 @@ public final class StandardProcessScheduler implements ProcessScheduler { getSchedulingAgent(worker).onEvent(worker); } - /** - * Returns the number of threads that are currently active for the given - * <code>Connectable</code>. - * - * @return - */ @Override public int getActiveThreadCount(final Object scheduled) { return getScheduleState(scheduled).getActiveThreadCount(); } - /** - * Begins scheduling the given port to run. - * - * @throws NullPointerException if the Port is null - * @throws IllegalStateException if the Port is already scheduled to run or - * has threads running - */ @Override public void startPort(final Port port) { if (!port.isValid()) { @@ -501,7 +480,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { if (!state.isScheduled()) { return; } - + state.setScheduled(false); getSchedulingAgent(connectable).unschedule(connectable, state); @@ -561,7 +540,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { if (procNode.getScheduledState() != ScheduledState.DISABLED) { throw new IllegalStateException("Processor cannot be enabled because it is not disabled"); } - + procNode.setScheduledState(ScheduledState.STOPPED); } @@ -570,21 +549,22 @@ public final class StandardProcessScheduler implements ProcessScheduler { if (procNode.getScheduledState() != ScheduledState.STOPPED) { throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState()); } - + procNode.setScheduledState(ScheduledState.DISABLED); } public synchronized void enableReportingTask(final ReportingTaskNode taskNode) { - if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) { + if (taskNode.getScheduledState() != ScheduledState.DISABLED) { throw new IllegalStateException("Reporting Task cannot be enabled because it is not disabled"); } taskNode.setScheduledState(ScheduledState.STOPPED); } - + public synchronized void disableReportingTask(final ReportingTaskNode taskNode) { - if ( taskNode.getScheduledState() != ScheduledState.STOPPED ) { - throw new IllegalStateException("Reporting Task cannot be disabled because its state is set to " + taskNode.getScheduledState() + " but transition to DISABLED state is allowed only from the STOPPED state"); + if (taskNode.getScheduledState() != ScheduledState.STOPPED) { + throw new IllegalStateException("Reporting Task cannot be disabled because its state is set to " + taskNode.getScheduledState() + + " but transition to DISABLED state is allowed only from the STOPPED state"); } taskNode.setScheduledState(ScheduledState.DISABLED); @@ -597,12 +577,12 @@ public final class StandardProcessScheduler implements ProcessScheduler { } /** - * Returns the ScheduleState that is registered for the given component; - * if no ScheduleState current is registered, one is created and registered + * Returns the ScheduleState that is registered for the given component; if + * no ScheduleState current is registered, one is created and registered * atomically, and then that value is returned. * - * @param schedulable - * @return + * @param schedulable schedulable + * @return scheduled state */ private ScheduleState getScheduleState(final Object schedulable) { ScheduleState scheduleState = scheduleStates.get(schedulable); @@ -620,21 +600,21 @@ public final class StandardProcessScheduler implements ProcessScheduler { public void enableControllerService(final ControllerServiceNode service) { service.setState(ControllerServiceState.ENABLING); final ScheduleState scheduleState = getScheduleState(service); - + final Runnable enableRunnable = new Runnable() { @Override public void run() { try (final NarCloseable x = NarCloseable.withNarLoader()) { long lastStopTime = scheduleState.getLastStopTime(); final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider); - + while (true) { try { synchronized (scheduleState) { // if no longer enabled, then we're finished. This can happen, for example, // if the @OnEnabled method throws an Exception and the user disables the service // while we're administratively yielded. - // + // // we also check if the schedule state's last stop time is equal to what it was before. // if not, then means that the service has been disabled and enabled again, so we should just // bail; another thread will be responsible for invoking the @OnEnabled methods. @@ -649,11 +629,11 @@ public final class StandardProcessScheduler implements ProcessScheduler { } } catch (final Exception e) { final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e; - + final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service); componentLog.error("Failed to invoke @OnEnabled method due to {}", cause); LOG.error("Failed to invoke @OnEnabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString()); - if ( LOG.isDebugEnabled() ) { + if (LOG.isDebugEnabled()) { LOG.error("", cause); } @@ -666,15 +646,15 @@ public final class StandardProcessScheduler implements ProcessScheduler { final Throwable cause = (t instanceof InvocationTargetException) ? t.getCause() : t; final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service); componentLog.error("Failed to invoke @OnEnabled method due to {}", cause); - + LOG.error("Failed to invoke @OnEnabled method on {} due to {}", service.getControllerServiceImplementation(), cause.toString()); - if ( LOG.isDebugEnabled() ) { + if (LOG.isDebugEnabled()) { LOG.error("", cause); } } } }; - + scheduleState.setScheduled(true); componentLifeCycleThreadPool.execute(enableRunnable); } @@ -682,7 +662,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public void disableControllerService(final ControllerServiceNode service) { service.verifyCanDisable(); - + final ScheduleState state = getScheduleState(requireNonNull(service)); final Runnable disableRunnable = new Runnable() { @Override @@ -693,8 +673,8 @@ public final class StandardProcessScheduler implements ProcessScheduler { try (final NarCloseable x = NarCloseable.withNarLoader()) { final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider); - - while(true) { + + while (true) { try { ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext); heartbeater.heartbeat(); @@ -704,17 +684,18 @@ public final class StandardProcessScheduler implements ProcessScheduler { final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e; final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service); componentLog.error("Failed to invoke @OnDisabled method due to {}", cause); - + LOG.error("Failed to invoke @OnDisabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString()); - if ( LOG.isDebugEnabled() ) { + if (LOG.isDebugEnabled()) { LOG.error("", cause); } - + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext); try { Thread.sleep(administrativeYieldMillis); - } catch (final InterruptedException ie) {} - + } catch (final InterruptedException ie) { + } + continue; } } @@ -723,6 +704,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { }; service.setState(ControllerServiceState.DISABLING); - componentLifeCycleThreadPool.execute(disableRunnable); + componentLifeCycleThreadPool.execute(disableRunnable); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java index f3eecbd..c4e6609 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java @@ -42,9 +42,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TimerDrivenSchedulingAgent implements SchedulingAgent { + private static final Logger logger = LoggerFactory.getLogger(TimerDrivenSchedulingAgent.class); private final long noWorkYieldNanos; - + private final FlowController flowController; private final FlowEngine flowEngine; private final ProcessContextFactory contextFactory; @@ -57,7 +58,7 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent { this.flowEngine = flowEngine; this.contextFactory = contextFactory; this.encryptor = encryptor; - + final String boredYieldDuration = NiFiProperties.getInstance().getBoredYieldDuration(); try { noWorkYieldNanos = FormatUtils.getTimeDuration(boredYieldDuration, TimeUnit.NANOSECONDS); @@ -84,31 +85,30 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent { logger.info("{} started.", taskNode.getReportingTask()); } - @Override public void schedule(final Connectable connectable, final ScheduleState scheduleState) { - + final List<ScheduledFuture<?>> futures = new ArrayList<>(); for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) { final Callable<Boolean> continuallyRunTask; final ProcessContext processContext; - + // Determine the task to run and create it. if (connectable.getConnectableType() == ConnectableType.PROCESSOR) { final ProcessorNode procNode = (ProcessorNode) connectable; final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor); - final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, + final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext); - + continuallyRunTask = runnableTask; processContext = standardProcContext; } else { processContext = new ConnectableProcessContext(connectable, encryptor); continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, processContext); } - + final AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>(); - + final Runnable yieldDetectionRunnable = new Runnable() { @Override public void run() { @@ -122,50 +122,50 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent { } catch (final Exception e) { throw new ProcessException(e); } - + // If the component is yielded, cancel its future and re-submit it to run again // after the yield has expired. final long newYieldExpiration = connectable.getYieldExpiration(); - if ( newYieldExpiration > System.currentTimeMillis() ) { + if (newYieldExpiration > System.currentTimeMillis()) { final long yieldMillis = System.currentTimeMillis() - newYieldExpiration; final ScheduledFuture<?> scheduledFuture = futureRef.get(); - if ( scheduledFuture == null ) { + if (scheduledFuture == null) { return; } - + // If we are able to cancel the future, create a new one and update the ScheduleState so that it has // an accurate accounting of which futures are outstanding; we must then also update the futureRef // so that we can do this again the next time that the component is yielded. if (scheduledFuture.cancel(false)) { final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis); - + synchronized (scheduleState) { - if ( scheduleState.isScheduled() ) { - final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos, + if (scheduleState.isScheduled()) { + final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos, connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); - + scheduleState.replaceFuture(scheduledFuture, newFuture); futureRef.set(newFuture); } } } - } else if ( noWorkYieldNanos > 0L && shouldYield ) { + } else if (noWorkYieldNanos > 0L && shouldYield) { // Component itself didn't yield but there was no work to do, so the framework will choose // to yield the component automatically for a short period of time. final ScheduledFuture<?> scheduledFuture = futureRef.get(); - if ( scheduledFuture == null ) { + if (scheduledFuture == null) { return; } - + // If we are able to cancel the future, create a new one and update the ScheduleState so that it has // an accurate accounting of which futures are outstanding; we must then also update the futureRef // so that we can do this again the next time that the component is yielded. if (scheduledFuture.cancel(false)) { synchronized (scheduleState) { - if ( scheduleState.isScheduled() ) { - final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, noWorkYieldNanos, + if (scheduleState.isScheduled()) { + final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, noWorkYieldNanos, connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); - + scheduleState.replaceFuture(scheduledFuture, newFuture); futureRef.set(newFuture); } @@ -176,13 +176,13 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent { }; // Schedule the task to run - final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(yieldDetectionRunnable, 0L, + final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(yieldDetectionRunnable, 0L, connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); - + // now that we have the future, set the atomic reference so that if the component is yielded we // are able to then cancel this future. futureRef.set(future); - + // Keep track of the futures so that we can update the ScheduleState. futures.add(future); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java index 1fde670..92fa3b2 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java @@ -43,15 +43,16 @@ import org.w3c.dom.Element; import org.xml.sax.SAXException; import org.xml.sax.SAXParseException; -/** - * - */ public class ControllerServiceLoader { private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class); - - public static List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) throws IOException { + public static List<ControllerServiceNode> loadControllerServices( + final ControllerServiceProvider provider, + final InputStream serializedStream, + final StringEncryptor encryptor, + final BulletinRepository bulletinRepo, + final boolean autoResumeState) throws IOException { final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance(); documentBuilderFactory.setNamespaceAware(true); @@ -87,66 +88,70 @@ public class ControllerServiceLoader { throw err; } }); - + final Document document = builder.parse(in); final Element controllerServices = document.getDocumentElement(); final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServices, "controllerService"); - return new ArrayList<ControllerServiceNode>(loadControllerServices(serviceElements, provider, encryptor, bulletinRepo, autoResumeState)); + return new ArrayList<>(loadControllerServices(serviceElements, provider, encryptor, bulletinRepo, autoResumeState)); } catch (SAXException | ParserConfigurationException sxe) { throw new IOException(sxe); } } - - public static Collection<ControllerServiceNode> loadControllerServices(final List<Element> serviceElements, final ControllerServiceProvider provider, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) { + + public static Collection<ControllerServiceNode> loadControllerServices( + final List<Element> serviceElements, + final ControllerServiceProvider provider, + final StringEncryptor encryptor, + final BulletinRepository bulletinRepo, + final boolean autoResumeState) { final Map<ControllerServiceNode, Element> nodeMap = new HashMap<>(); - for ( final Element serviceElement : serviceElements ) { + for (final Element serviceElement : serviceElements) { final ControllerServiceNode serviceNode = createControllerService(provider, serviceElement, encryptor); - // We need to clone the node because it will be used in a separate thread below, and + // We need to clone the node because it will be used in a separate thread below, and // Element is not thread-safe. nodeMap.put(serviceNode, (Element) serviceElement.cloneNode(true)); } - for ( final Map.Entry<ControllerServiceNode, Element> entry : nodeMap.entrySet() ) { + for (final Map.Entry<ControllerServiceNode, Element> entry : nodeMap.entrySet()) { configureControllerService(entry.getKey(), entry.getValue(), encryptor); } - + // Start services - if ( autoResumeState ) { + if (autoResumeState) { final Set<ControllerServiceNode> nodesToEnable = new HashSet<>(); - - for ( final ControllerServiceNode node : nodeMap.keySet() ) { + + for (final ControllerServiceNode node : nodeMap.keySet()) { final Element controllerServiceElement = nodeMap.get(node); final ControllerServiceDTO dto; synchronized (controllerServiceElement.getOwnerDocument()) { dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); } - + final ControllerServiceState state = ControllerServiceState.valueOf(dto.getState()); if (state == ControllerServiceState.ENABLED) { nodesToEnable.add(node); } } - + provider.enableControllerServices(nodesToEnable); } - + return nodeMap.keySet(); } - - + private static ControllerServiceNode createControllerService(final ControllerServiceProvider provider, final Element controllerServiceElement, final StringEncryptor encryptor) { final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); - + final ControllerServiceNode node = provider.createControllerService(dto.getType(), dto.getId(), false); node.setName(dto.getName()); node.setComments(dto.getComments()); return node; } - + private static void configureControllerService(final ControllerServiceNode node, final Element controllerServiceElement, final StringEncryptor encryptor) { final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); node.setAnnotationData(dto.getAnnotationData()); - + for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) { if (entry.getValue() == null) { node.removeProperty(entry.getKey()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java index 8d46b05..02d6263 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java @@ -64,15 +64,15 @@ public class StandardControllerServiceInitializationContext implements Controlle public boolean isControllerServiceEnabled(final ControllerService service) { return serviceProvider.isControllerServiceEnabled(service); } - + @Override public boolean isControllerServiceEnabling(String serviceIdentifier) { return serviceProvider.isControllerServiceEnabling(serviceIdentifier); } - + @Override public String getControllerServiceName(final String serviceIdentifier) { - return serviceProvider.getControllerServiceName(serviceIdentifier); + return serviceProvider.getControllerServiceName(serviceIdentifier); } @Override http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index e768b9a..e577ffe 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -59,12 +59,11 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i this.serviceProvider = serviceProvider; } - @Override public ControllerService getProxiedControllerService() { return proxedControllerService; } - + @Override public ControllerService getControllerServiceImplementation() { return implementation; @@ -106,23 +105,23 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first."); } } - + @Override public void setProperty(final String name, final String value) { super.setProperty(name, value); onConfigured(); } - + @Override public boolean removeProperty(String name) { final boolean removed = super.removeProperty(name); - if ( removed ) { + if (removed) { onConfigured(); } - + return removed; } - + @SuppressWarnings("deprecation") private void onConfigured() { try (final NarCloseable x = NarCloseable.withNarLoader()) { @@ -132,97 +131,97 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i throw new ComponentLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + implementation, e); } } - + @Override public void verifyCanDelete() { - if ( getState() != ControllerServiceState.DISABLED ) { + if (getState() != ControllerServiceState.DISABLED) { throw new IllegalStateException(implementation + " cannot be deleted because it is not disabled"); } } - + @Override public void verifyCanDisable() { verifyCanDisable(Collections.<ControllerServiceNode>emptySet()); } - + @Override public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) { final ControllerServiceState state = getState(); - if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) { + if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) { throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation() + " because it is not enabled"); } - + final ControllerServiceReference references = getReferences(); - - for ( final ConfiguredComponent activeReference : references.getActiveReferences() ) { - if ( !ignoreReferences.contains(activeReference) ) { + + for (final ConfiguredComponent activeReference : references.getActiveReferences()) { + if (!ignoreReferences.contains(activeReference)) { throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by at least one component that is currently running"); } } } - + @Override public void verifyCanEnable() { - if ( getState() != ControllerServiceState.DISABLED ) { + if (getState() != ControllerServiceState.DISABLED) { throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled"); } - - if ( !isValid() ) { + + if (!isValid()) { throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + getValidationErrors()); } } - + @Override public void verifyCanEnable(final Set<ControllerServiceNode> ignoredReferences) { if (getState() != ControllerServiceState.DISABLED) { throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled"); } - + final Set<String> ids = new HashSet<>(); - for ( final ControllerServiceNode node : ignoredReferences ) { + for (final ControllerServiceNode node : ignoredReferences) { ids.add(node.getIdentifier()); } - + final Collection<ValidationResult> validationResults = getValidationErrors(ids); - for ( final ValidationResult result : validationResults ) { - if ( !result.isValid() ) { + for (final ValidationResult result : validationResults) { + if (!result.isValid()) { throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + result); } } } - + @Override public void verifyCanUpdate() { - if ( getState() != ControllerServiceState.DISABLED ) { + if (getState() != ControllerServiceState.DISABLED) { throw new IllegalStateException(implementation + " cannot be updated because it is not disabled"); } } - + @Override public String getComments() { - readLock.lock(); - try { - return comment; - } finally { - readLock.unlock(); - } + readLock.lock(); + try { + return comment; + } finally { + readLock.unlock(); + } } - + @Override public void setComments(final String comment) { - writeLock.lock(); - try { - this.comment = comment; - } finally { - writeLock.unlock(); - } + writeLock.lock(); + try { + this.comment = comment; + } finally { + writeLock.unlock(); + } } - + @Override public ControllerServiceState getState() { return stateRef.get(); } - + @Override public void setState(final ControllerServiceState state) { this.stateRef.set(state);
