http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 355e303..eb7ec83 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -23,9 +23,6 @@ import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.controller.ProcessScheduler; -import org.apache.nifi.controller.ValidationContextFactory; -import org.apache.nifi.controller.ProcessorNode; import static java.util.Objects.requireNonNull; @@ -157,9 +154,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SideEffectFree.class); batchSupported = procClass.isAnnotationPresent(SupportsBatching.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SupportsBatching.class); triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerSerially.class); - triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class); + triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class) + || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class); this.validationContextFactory = validationContextFactory; - eventDrivenSupported = (procClass.isAnnotationPresent(EventDriven.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.EventDriven.class) )&& !triggeredSerially && !triggerWhenEmpty; + eventDrivenSupported = (procClass.isAnnotationPresent(EventDriven.class) + || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.EventDriven.class)) && !triggeredSerially && !triggerWhenEmpty; schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; } @@ -175,7 +174,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * Provides and opportunity to retain information about this particular * processor instance * - * @param comments + * @param comments new comments */ @Override public void setComments(final String comments) { @@ -205,10 +204,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable this.position.set(position); } + @Override public Map<String, String> getStyle() { return style.get(); } + @Override public void setStyle(final Map<String, String> style) { if (style != null) { this.style.set(Collections.unmodifiableMap(new HashMap<>(style))); @@ -229,9 +230,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return lossTolerant.get(); } - /** - * @return if true processor runs only on the primary node - */ + @Override public boolean isIsolated() { return isolated.get(); } @@ -249,6 +248,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * @return true if the processor has the {@link SideEffectFree} annotation, * false otherwise. */ + @Override public boolean isSideEffectFree() { return sideEffectFree; } @@ -262,6 +262,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * @return true if the processor has the * {@link TriggerWhenAnyDestinationAvailable} annotation, false otherwise. */ + @Override public boolean isTriggerWhenAnyDestinationAvailable() { return triggerWhenAnyDestinationAvailable; } @@ -270,7 +271,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * Indicates whether flow file content made by this processor must be * persisted * - * @param lossTolerant + * @param lossTolerant tolerant */ @Override public void setLossTolerant(final boolean lossTolerant) { @@ -288,7 +289,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable /** * Indicates whether the processor runs on only the primary node. * - * @param isolated + * @param isolated isolated */ public void setIsolated(final boolean isolated) { writeLock.lock(); @@ -311,12 +312,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return terminatable.contains(relationship); } - /** - * Indicates whether flow files transferred to undefined relationships - * should be terminated - * - * @param terminate - */ @Override public void setAutoTerminatedRelationships(final Set<Relationship> terminate) { writeLock.lock(); @@ -340,6 +335,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * @return an unmodifiable Set that contains all of the * ProcessorRelationship objects that are configured to be auto-terminated */ + @Override public Set<Relationship> getAutoTerminatedRelationships() { Set<Relationship> relationships = undefinedRelationshipsToTerminate.get(); if (relationships == null) { @@ -361,16 +357,16 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable public String getProcessorDescription() { CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class); String description = null; - if ( capDesc != null ) { + if (capDesc != null) { description = capDesc.value(); } else { - final org.apache.nifi.processor.annotation.CapabilityDescription deprecatedCapDesc = - processor.getClass().getAnnotation(org.apache.nifi.processor.annotation.CapabilityDescription.class); - if ( deprecatedCapDesc != null ) { + final org.apache.nifi.processor.annotation.CapabilityDescription deprecatedCapDesc + = processor.getClass().getAnnotation(org.apache.nifi.processor.annotation.CapabilityDescription.class); + if (deprecatedCapDesc != null) { description = deprecatedCapDesc.value(); } } - + return description; } @@ -399,6 +395,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS); } + @Override public boolean isEventDrivenSupported() { readLock.lock(); try { @@ -411,11 +408,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable /** * Updates the Scheduling Strategy used for this Processor * - * @param schedulingStrategy + * @param schedulingStrategy strategy * * @throws IllegalArgumentException if the SchedulingStrategy is not not * applicable for this Processor */ + @Override public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) { writeLock.lock(); try { @@ -434,10 +432,9 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } /** - * Returns the currently configured scheduling strategy - * - * @return + * @return the currently configured scheduling strategy */ + @Override public SchedulingStrategy getSchedulingStrategy() { readLock.lock(); try { @@ -452,12 +449,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return schedulingPeriod.get(); } - /** - * @param value the number of <code>timeUnit</code>s between scheduling - * intervals. - * @param timeUnit determines the unit of time to represent the scheduling - * period. - */ @Override public void setScheduldingPeriod(final String schedulingPeriod) { writeLock.lock(); @@ -519,28 +510,16 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } } - /** - * @param timeUnit determines the unit of time to represent the yield - * period. If null will be reported in units of - * {@link #DEFAULT_SCHEDULING_TIME_UNIT}. - * @return - */ @Override public long getYieldPeriod(final TimeUnit timeUnit) { return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); } + @Override public String getYieldPeriod() { return yieldPeriod.get(); } - /** - * Updates the amount of time that this processor should avoid being - * scheduled when the processor calls - * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()} - * - * @param yieldPeriod - */ @Override public void setYieldPeriod(final String yieldPeriod) { writeLock.lock(); @@ -573,6 +552,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable LoggerFactory.getLogger(processor.getClass()).debug("{} has chosen to yield its resources; will not be scheduled to run again for {}", processor, yieldDuration); } + @Override public void yield(final long period, final TimeUnit timeUnit) { final long yieldMillis = TimeUnit.MILLISECONDS.convert(period, timeUnit); yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis)); @@ -642,6 +622,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } } + @Override public boolean isTriggeredSerially() { return triggeredSerially; } @@ -655,10 +636,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return concurrentTaskCount.get(); } + @Override public LogLevel getBulletinLevel() { return LogRepositoryFactory.getRepository(getIdentifier()).getObservationLevel(BULLETIN_OBSERVER_ID); } + @Override public void setBulletinLevel(final LogLevel level) { LogRepositoryFactory.getRepository(getIdentifier()).setObservationLevel(BULLETIN_OBSERVER_ID, level); } @@ -778,7 +761,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable // if we are running and we do not terminate undefined relationships and this is the only // connection that defines the given relationship, and that relationship is required, // then it is not legal to remove this relationship from this connection. - throw new IllegalStateException("Cannot remove relationship " + rel.getName() + " from Connection because doing so would invalidate Processor " + this + ", which is currently running"); + throw new IllegalStateException("Cannot remove relationship " + rel.getName() + " from Connection because doing so would invalidate Processor " + + this + ", which is currently running"); } } } @@ -872,11 +856,9 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } /** - * Gets the relationship for this nodes processor for the given name or - * creates a new relationship for the given name. - * - * @param relationshipName - * @return + * @param relationshipName name + * @return the relationship for this nodes processor for the given name or + * creates a new relationship for the given name */ @Override public Relationship getRelationship(final String relationshipName) { @@ -897,15 +879,14 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return returnRel; } + @Override public Processor getProcessor() { return this.processor; } /** - * Obtains the Set of destination processors for all relationships excluding + * @return the Set of destination processors for all relationships excluding * any destinations that are this processor itself (self-loops) - * - * @return */ public Set<Connectable> getDestinations() { final Set<Connectable> nonSelfDestinations = new HashSet<>(); @@ -965,7 +946,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable /** * Determines if the given node is a destination for this node * - * @param node + * @param node node * @return true if is a direct destination node; false otherwise */ boolean isRelated(final ProcessorNode node) { @@ -986,7 +967,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable readLock.unlock(); } } - + @Override public int getActiveThreadCount() { readLock.lock(); @@ -1067,8 +1048,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable /** * Establishes node equality (based on the processor's identifier) * - * @param other - * @return + * @param other node + * @return true if equal */ @Override public boolean equals(final Object other) { @@ -1125,6 +1106,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return ConnectableType.PROCESSOR; } + @Override public void setScheduledState(final ScheduledState scheduledState) { this.scheduledState.set(scheduledState); if (!scheduledState.equals(ScheduledState.RUNNING)) { // if user stops processor, clear yield expiration @@ -1210,7 +1192,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable readLock.unlock(); } } - + @Override public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) { switch (getScheduledState()) { @@ -1222,15 +1204,15 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable break; } verifyNoActiveThreads(); - + final Set<String> ids = new HashSet<>(); - for ( final ControllerServiceNode node : ignoredReferences ) { + for (final ControllerServiceNode node : ignoredReferences) { ids.add(node.getIdentifier()); } - + final Collection<ValidationResult> validationResults = getValidationErrors(ids); - for ( final ValidationResult result : validationResults ) { - if ( !result.isValid() ) { + for (final ValidationResult result : validationResults) { + if (!result.isValid()) { throw new IllegalStateException(this + " cannot be started because it is not valid: " + result); } }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardSnippet.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardSnippet.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardSnippet.java index 2c9d85e..9206054 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardSnippet.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardSnippet.java @@ -33,20 +33,16 @@ public class StandardSnippet implements Snippet { private String parentGroupId; private Boolean linked; - private Set<String> processGroups = new HashSet<>(); - private Set<String> remoteProcessGroups = new HashSet<>(); - private Set<String> processors = new HashSet<>(); - private Set<String> inputPorts = new HashSet<>(); - private Set<String> outputPorts = new HashSet<>(); - private Set<String> connections = new HashSet<>(); - private Set<String> labels = new HashSet<>(); - private Set<String> funnels = new HashSet<>(); - - /** - * The id of this snippet. - * - * @return - */ + private final Set<String> processGroups = new HashSet<>(); + private final Set<String> remoteProcessGroups = new HashSet<>(); + private final Set<String> processors = new HashSet<>(); + private final Set<String> inputPorts = new HashSet<>(); + private final Set<String> outputPorts = new HashSet<>(); + private final Set<String> connections = new HashSet<>(); + private final Set<String> labels = new HashSet<>(); + private final Set<String> funnels = new HashSet<>(); + + @Override public String getId() { return id; } @@ -55,11 +51,7 @@ public class StandardSnippet implements Snippet { this.id = id; } - /** - * Whether or not this snippet is linked to the data flow. - * - * @return - */ + @Override public boolean isLinked() { if (linked == null) { return false; @@ -72,11 +64,7 @@ public class StandardSnippet implements Snippet { this.linked = linked; } - /** - * The parent group id of the components in this snippet. - * - * @return - */ + @Override public String getParentGroupId() { return parentGroupId; } @@ -85,11 +73,7 @@ public class StandardSnippet implements Snippet { this.parentGroupId = parentGroupId; } - /** - * The connections in this snippet. - * - * @return - */ + @Override public Set<String> getConnections() { return Collections.unmodifiableSet(connections); } @@ -98,11 +82,7 @@ public class StandardSnippet implements Snippet { connections.addAll(ids); } - /** - * The funnels in this snippet. - * - * @return - */ + @Override public Set<String> getFunnels() { return Collections.unmodifiableSet(funnels); } @@ -111,11 +91,7 @@ public class StandardSnippet implements Snippet { funnels.addAll(ids); } - /** - * The input ports in this snippet. - * - * @return - */ + @Override public Set<String> getInputPorts() { return Collections.unmodifiableSet(inputPorts); } @@ -124,11 +100,7 @@ public class StandardSnippet implements Snippet { inputPorts.addAll(ids); } - /** - * The output ports in this snippet. - * - * @return - */ + @Override public Set<String> getOutputPorts() { return Collections.unmodifiableSet(outputPorts); } @@ -137,11 +109,7 @@ public class StandardSnippet implements Snippet { outputPorts.addAll(ids); } - /** - * The labels in this snippet. - * - * @return - */ + @Override public Set<String> getLabels() { return Collections.unmodifiableSet(labels); } @@ -150,6 +118,7 @@ public class StandardSnippet implements Snippet { labels.addAll(ids); } + @Override public Set<String> getProcessGroups() { return Collections.unmodifiableSet(processGroups); } @@ -158,6 +127,7 @@ public class StandardSnippet implements Snippet { processGroups.addAll(ids); } + @Override public Set<String> getProcessors() { return Collections.unmodifiableSet(processors); } @@ -166,6 +136,7 @@ public class StandardSnippet implements Snippet { processors.addAll(ids); } + @Override public Set<String> getRemoteProcessGroups() { return Collections.unmodifiableSet(remoteProcessGroups); } @@ -174,11 +145,7 @@ public class StandardSnippet implements Snippet { remoteProcessGroups.addAll(ids); } - /** - * Determines if this snippet is empty. - * - * @return - */ + @Override public boolean isEmpty() { return processors.isEmpty() && processGroups.isEmpty() && remoteProcessGroups.isEmpty() && labels.isEmpty() && inputPorts.isEmpty() && outputPorts.isEmpty() && connections.isEmpty() && funnels.isEmpty(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/Template.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/Template.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/Template.java index 26c4cd2..c2e8e04 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/Template.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/Template.java @@ -29,7 +29,7 @@ public class Template { /** * Returns a TemplateDTO object that describes the contents of this Template * - * @return + * @return template dto */ public TemplateDTO getDetails() { return dto; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java index 30d4365..6a11a33 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java @@ -103,7 +103,7 @@ public class TemplateManager { * snippet of this flow. Any sensitive properties in the TemplateDTO will be * removed. * - * @param dto + * @param dto dto * @return a copy of the given DTO * @throws IOException if an I/O error occurs when persisting the Template * @throws NullPointerException if the DTO is null @@ -144,6 +144,8 @@ public class TemplateManager { /** * Clears all Templates from the TemplateManager + * + * @throws java.io.IOException ioe */ public void clear() throws IOException { writeLock.lock(); @@ -175,10 +177,8 @@ public class TemplateManager { } /** - * Returns the template with the given id, if it exists; else, returns null - * - * @param id - * @return + * @param id template id + * @return the template with the given id, if it exists; else, returns null */ public Template getTemplate(final String id) { readLock.lock(); @@ -192,7 +192,7 @@ public class TemplateManager { /** * Loads the templates from disk * - * @throws IOException + * @throws IOException ioe */ public void loadTemplates() throws IOException { writeLock.lock(); @@ -237,8 +237,8 @@ public class TemplateManager { /** * Persists the given template to disk * - * @param dto - * @throws IOException + * @param template template + * @throws IOException ioe */ private void persistTemplate(final Template template) throws IOException { final Path path = directory.resolve(template.getDetails().getId() + ".template"); @@ -249,7 +249,7 @@ public class TemplateManager { * Scrubs the template prior to persisting in order to remove fields that * shouldn't be included or are unnecessary. * - * @param snippet + * @param snippet snippet */ private void scrubTemplate(final FlowSnippetDTO snippet) { // ensure that contents have been specified @@ -273,7 +273,7 @@ public class TemplateManager { if (snippet.getProcessGroups() != null) { scrubProcessGroups(snippet.getProcessGroups()); } - + // go through each controller service if specified if (snippet.getControllerServices() != null) { scrubControllerServices(snippet.getControllerServices()); @@ -284,7 +284,7 @@ public class TemplateManager { /** * Scrubs process groups prior to saving. * - * @param processGroups + * @param processGroups groups */ private void scrubProcessGroups(final Set<ProcessGroupDTO> processGroups) { // go through each process group @@ -297,7 +297,7 @@ public class TemplateManager { * Scrubs processors prior to saving. This includes removing sensitive * properties, validation errors, property descriptors, etc. * - * @param snippet + * @param processors procs */ private void scrubProcessors(final Set<ProcessorDTO> processors) { // go through each processor @@ -328,20 +328,20 @@ public class TemplateManager { processorDTO.setValidationErrors(null); } } - + private void scrubControllerServices(final Set<ControllerServiceDTO> controllerServices) { - for ( final ControllerServiceDTO serviceDTO : controllerServices ) { + for (final ControllerServiceDTO serviceDTO : controllerServices) { final Map<String, String> properties = serviceDTO.getProperties(); final Map<String, PropertyDescriptorDTO> descriptors = serviceDTO.getDescriptors(); - - if ( properties != null && descriptors != null ) { - for ( final PropertyDescriptorDTO descriptor : descriptors.values() ) { - if ( descriptor.isSensitive() ) { + + if (properties != null && descriptors != null) { + for (final PropertyDescriptorDTO descriptor : descriptors.values()) { + if (descriptor.isSensitive()) { properties.put(descriptor.getName(), null); } } } - + serviceDTO.setCustomUiUrl(null); serviceDTO.setValidationErrors(null); } @@ -351,7 +351,7 @@ public class TemplateManager { * Scrubs connections prior to saving. This includes removing available * relationships. * - * @param snippet + * @param connections conns */ private void scrubConnections(final Set<ConnectionDTO> connections) { // go through each connection @@ -366,7 +366,7 @@ public class TemplateManager { /** * Remove unnecessary fields in connectables prior to saving. * - * @param connectable + * @param connectable connectable */ private void scrubConnectable(final ConnectableDTO connectable) { if (connectable != null) { @@ -381,7 +381,7 @@ public class TemplateManager { /** * Remove unnecessary fields in remote groups prior to saving. * - * @param remoteGroups + * @param remoteGroups groups */ private void scrubRemoteProcessGroups(final Set<RemoteProcessGroupDTO> remoteGroups) { // go through each remote process group @@ -411,7 +411,7 @@ public class TemplateManager { /** * Remove unnecessary fields in remote ports prior to saving. * - * @param remotePorts + * @param remotePorts ports */ private void scrubRemotePorts(final Set<RemoteProcessGroupPortDTO> remotePorts) { for (final Iterator<RemoteProcessGroupPortDTO> remotePortIter = remotePorts.iterator(); remotePortIter.hasNext();) { @@ -455,7 +455,7 @@ public class TemplateManager { } catch (final IOException e) { logger.error(String.format("Unable to remove template file for template %s.", id)); - // since the template file existed and we were unable to remove it, rollback + // since the template file existed and we were unable to remove it, rollback // by returning it to the template map templateMap.put(id, removed); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index 18d2c5f..7adab68 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -49,10 +49,10 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon private final AtomicReference<SchedulingStrategy> schedulingStrategy = new AtomicReference<>(SchedulingStrategy.TIMER_DRIVEN); private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("5 mins"); - + private volatile String comment; private volatile ScheduledState scheduledState = ScheduledState.STOPPED; - + public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id, final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory) { @@ -96,7 +96,7 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon public boolean isRunning() { return processScheduler.isScheduled(this) || processScheduler.getActiveThreadCount(this) > 0; } - + @Override public int getActiveThreadCount() { return processScheduler.getActiveThreadCount(this); @@ -118,31 +118,31 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon public ScheduledState getScheduledState() { return scheduledState; } - + @Override public void setScheduledState(final ScheduledState state) { this.scheduledState = state; } - + @Override public void setProperty(final String name, final String value) { super.setProperty(name, value); - + onConfigured(); } - + @Override public boolean removeProperty(String name) { final boolean removed = super.removeProperty(name); - if ( removed ) { + if (removed) { onConfigured(); } - + return removed; } - + @SuppressWarnings("deprecation") - private void onConfigured() { + private void onConfigured() { // We need to invoke any method annotation with the OnConfigured annotation in order to // maintain backward compatibility. This will be removed when we remove the old, deprecated annotations. try (final NarCloseable x = NarCloseable.withNarLoader()) { @@ -152,72 +152,71 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon throw new ComponentLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + reportingTask, e); } } - + public boolean isDisabled() { return scheduledState == ScheduledState.DISABLED; } - + @Override public String getComments() { - return comment; - } + return comment; + } @Override - public void setComments(final String comment) { - this.comment = comment; - } + public void setComments(final String comment) { + this.comment = comment; + } - @Override + @Override public void verifyCanDelete() { if (isRunning()) { throw new IllegalStateException("Cannot delete " + reportingTask + " because it is currently running"); } } - + @Override public void verifyCanDisable() { - if ( isRunning() ) { + if (isRunning()) { throw new IllegalStateException("Cannot disable " + reportingTask + " because it is currently running"); } - - if ( isDisabled() ) { + + if (isDisabled()) { throw new IllegalStateException("Cannot disable " + reportingTask + " because it is already disabled"); } } - - + @Override public void verifyCanEnable() { - if ( !isDisabled() ) { + if (!isDisabled()) { throw new IllegalStateException("Cannot enable " + reportingTask + " because it is not disabled"); } } - + @Override public void verifyCanStart() { - if ( isDisabled() ) { + if (isDisabled()) { throw new IllegalStateException("Cannot start " + reportingTask + " because it is currently disabled"); } - - if ( isRunning() ) { + + if (isRunning()) { throw new IllegalStateException("Cannot start " + reportingTask + " because it is already running"); } } - + @Override public void verifyCanStop() { - if ( !isRunning() ) { + if (!isRunning()) { throw new IllegalStateException("Cannot stop " + reportingTask + " because it is not running"); } } - + @Override public void verifyCanUpdate() { - if ( isRunning() ) { + if (isRunning()) { throw new IllegalStateException("Cannot update " + reportingTask + " because it is currently running"); } } - + @Override public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) { switch (getScheduledState()) { @@ -229,24 +228,23 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon break; } final int activeThreadCount = getActiveThreadCount(); - if ( activeThreadCount > 0 ) { + if (activeThreadCount > 0) { throw new IllegalStateException(this + " cannot be started because it has " + activeThreadCount + " active threads already"); } - + final Set<String> ids = new HashSet<>(); - for ( final ControllerServiceNode node : ignoredReferences ) { + for (final ControllerServiceNode node : ignoredReferences) { ids.add(node.getIdentifier()); } - + final Collection<ValidationResult> validationResults = getValidationErrors(ids); - for ( final ValidationResult result : validationResults ) { - if ( !result.isValid() ) { + for (final ValidationResult result : validationResults) { + if (!result.isValid()) { throw new IllegalStateException(this + " cannot be started because it is not valid: " + result); } } } - - + @Override public String toString() { return "ReportingTask[id=" + getIdentifier() + ", name=" + getName() + "]"; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java index 3d57533..2536d6f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java @@ -124,7 +124,7 @@ public class StandardReportingContext implements ReportingContext, ControllerSer public boolean isControllerServiceEnabled(final String serviceIdentifier) { return serviceProvider.isControllerServiceEnabled(serviceIdentifier); } - + @Override public boolean isControllerServiceEnabling(final String serviceIdentifier) { return serviceProvider.isControllerServiceEnabling(serviceIdentifier); @@ -135,9 +135,9 @@ public class StandardReportingContext implements ReportingContext, ControllerSer return this; } - @Override - public String getControllerServiceName(final String serviceIdentifier) { - return serviceProvider.getControllerServiceName(serviceIdentifier); - } - + @Override + public String getControllerServiceName(final String serviceIdentifier) { + return serviceProvider.getControllerServiceName(serviceIdentifier); + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java index 435dbd0..0131a95 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java @@ -35,8 +35,8 @@ public class StandardReportingInitializationContext implements ReportingInitiali private final SchedulingStrategy schedulingStrategy; private final ControllerServiceProvider serviceProvider; private final ComponentLog logger; - - public StandardReportingInitializationContext(final String id, final String name, final SchedulingStrategy schedulingStrategy, + + public StandardReportingInitializationContext(final String id, final String name, final SchedulingStrategy schedulingStrategy, final String schedulingPeriod, final ComponentLog logger, final ControllerServiceProvider serviceProvider) { this.id = id; this.name = name; @@ -97,17 +97,17 @@ public class StandardReportingInitializationContext implements ReportingInitiali public boolean isControllerServiceEnabling(final String serviceIdentifier) { return serviceProvider.isControllerServiceEnabling(serviceIdentifier); } - + @Override public ControllerServiceLookup getControllerServiceLookup() { return this; } - + @Override public String getControllerServiceName(final String serviceIdentifier) { - return serviceProvider.getControllerServiceName(serviceIdentifier); + return serviceProvider.getControllerServiceName(serviceIdentifier); } - + @Override public ComponentLog getLogger() { return logger; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 2c3751b..d69b417 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -79,7 +79,6 @@ import org.slf4j.LoggerFactory; /** * Is thread safe * - * @author none */ public class FileSystemRepository implements ContentRepository { @@ -102,7 +101,7 @@ public class FileSystemRepository implements ContentRepository { private final boolean alwaysSync; private final ScheduledExecutorService containerCleanupExecutor; - private ContentClaimManager contentClaimManager; // effectively final + private ContentClaimManager contentClaimManager; // effectively final // Map of contianer to archived files that should be deleted next. private final Map<String, BlockingQueue<ArchiveInfo>> archivedFiles = new HashMap<>(); @@ -137,7 +136,8 @@ public class FileSystemRepository implements ContentRepository { archiveData = true; if (maxArchiveSize == null) { - throw new RuntimeException("No value specified for property '" + NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' but archiving is enabled. You must configure the max disk usage in order to enable archiving."); + throw new RuntimeException("No value specified for property '" + + NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' but archiving is enabled. You must configure the max disk usage in order to enable archiving."); } if (!MAX_ARCHIVE_SIZE_PATTERN.matcher(maxArchiveSize.trim()).matches()) { @@ -228,7 +228,7 @@ public class FileSystemRepository implements ContentRepository { executor.shutdown(); containerCleanupExecutor.shutdown(); } - + private static double getRatio(final String value) { final String trimmed = value.trim(); final String percentage = trimmed.substring(0, trimmed.length() - 1); @@ -241,7 +241,7 @@ public class FileSystemRepository implements ContentRepository { final List<Future<Long>> futures = new ArrayList<>(); // Run through each of the containers. For each container, create the sections if necessary. - // Then, we need to scan through the archived data so that we can determine what the oldest + // Then, we need to scan through the archived data so that we can determine what the oldest // archived data is, so that we know when we have to start aging data off. for (final Map.Entry<String, Path> container : containers.entrySet()) { final String containerName = container.getKey(); @@ -263,7 +263,7 @@ public class FileSystemRepository implements ContentRepository { realPathMap.put(containerName, realPath); // We need to scan the archive directories to find out the oldest timestamp so that know whether or not we - // will have to delete archived data based on time threshold. Scanning all of the directories can be very + // will have to delete archived data based on time threshold. Scanning all of the directories can be very // expensive because of all of the disk accesses. So we do this in multiple threads. Since containers are // often unique to a disk, we just map 1 thread to each container. final Callable<Long> scanContainer = new Callable<Long>() { @@ -765,7 +765,7 @@ public class FileSystemRepository implements ContentRepository { throw new RepositoryPurgeException("File " + path.toFile().getAbsolutePath() + " does not exist"); } - // Try up to 10 times to see if the directory is writable, in case another process (like a + // Try up to 10 times to see if the directory is writable, in case another process (like a // virus scanner) has the directory temporarily locked boolean writable = false; for (int i = 0; i < 10; i++) { @@ -803,17 +803,18 @@ public class FileSystemRepository implements ContentRepository { if (toDestroy.isEmpty()) { return; } - + for (final ContentClaim claim : toDestroy) { final String container = claim.getContainer(); final BlockingQueue<ContentClaim> claimQueue = reclaimable.get(container); - + try { while (true) { if (claimQueue.offer(claim, 10, TimeUnit.MINUTES)) { break; } else { - LOG.warn("Failed to clean up {} because old claims aren't being cleaned up fast enough. This Content Claim will remain in the Content Repository until NiFi is restarted, at which point it will be cleaned up", claim); + LOG.warn("Failed to clean up {} because old claims aren't being cleaned up fast enough. " + + "This Content Claim will remain in the Content Repository until NiFi is restarted, at which point it will be cleaned up", claim); } } } catch (final InterruptedException ie) { @@ -971,7 +972,8 @@ public class FileSystemRepository implements ContentRepository { // Otherwise, we're done. Return the last mod time of the oldest file in the container's archive. final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - LOG.info("Deleted {} files from archive for Container {}; oldest Archive Date is now {}; container cleanup took {} millis", deleteCount, containerName, new Date(oldestArchiveDate), millis); + LOG.info("Deleted {} files from archive for Container {}; oldest Archive Date is now {}; container cleanup took {} millis", + deleteCount, containerName, new Date(oldestArchiveDate), millis); return oldestArchiveDate; } @@ -1005,7 +1007,8 @@ public class FileSystemRepository implements ContentRepository { try { Files.deleteIfExists(file); containerState.decrementArchiveCount(); - LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because it was older than the configured max archival duration", file.toFile().getName(), containerName); + LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because it was older than the configured max archival duration", + file.toFile().getName(), containerName); } catch (final IOException ioe) { LOG.warn("Failed to remove archived ContentClaim with ID {} from Container {} due to {}", file.toFile().getName(), containerName, ioe.toString()); if (LOG.isDebugEnabled()) { @@ -1208,7 +1211,7 @@ public class FileSystemRepository implements ContentRepository { if (minRequiredSpace == null) { return; } - + try { final long usableSpace = getContainerUsableSpace(containerName); if (usableSpace > minRequiredSpace) { @@ -1277,11 +1280,9 @@ public class FileSystemRepository implements ContentRepository { } /** - * Returns {@code true} if wait is required to create claims against + * @return {@code true} if wait is required to create claims against * this Container, based on whether or not the container has reached its - * back pressure threshold. - * - * @return + * back pressure threshold */ public boolean isWaitRequired() { if (!archiveEnabled) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java index 7434efe..7502641 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProcessContext.java @@ -65,8 +65,8 @@ public class ProcessContext { /** * - * @param relatonship - * @return + * @param relationship relationship + * @return connections for relationship */ Collection<Connection> getConnections(final Relationship relationship) { Collection<Connection> collection = connectable.getConnections(relationship); @@ -106,15 +106,13 @@ public class ProcessContext { } /** - * Returns true if we are allowed to take FlowFiles only from self-loops. + * @return true if we are allowed to take FlowFiles only from self-loops. * This is the case when no Relationships are available except for * self-looping Connections - * - * @return */ private boolean pollFromSelfLoopsOnly() { if (isTriggerWhenAnyDestinationAvailable()) { - // we can pull from any incoming connection, as long as at least one downstream connection + // we can pull from any incoming connection, as long as at least one downstream connection // is available for each relationship. // I.e., we can poll only from self if no relationships are available return !Connectables.anyRelationshipAvailable(connectable); @@ -219,15 +217,14 @@ public class ProcessContext { } /** - * Checks if at least <code>requiredNumber</code> of Relationationships are - * "available." If so, returns <code>true</code>, otherwise returns - * <code>false</code>. - * * A Relationship is said to be Available if and only if all Connections for * that Relationship are either self-loops or have non-full queues. * - * @param requiredNumber - * @return + * @param requiredNumber minimum number of relationships that must have + * availability + * @return Checks if at least <code>requiredNumber</code> of + * Relationationships are "available." If so, returns <code>true</code>, + * otherwise returns <code>false</code> */ public boolean isRelationshipAvailabilitySatisfied(final int requiredNumber) { int unavailable = 0; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java index ae73bda..db098fc 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java @@ -25,9 +25,9 @@ public interface ProvenanceEventEnricher { * Returns a new Provenance event that has been updated to contain the * original and updated FlowFile attributes and content claim information. * - * @param record - * @param flowFile - * @return + * @param record record + * @param flowFile flowfile + * @return new event record */ ProvenanceEventRecord enrich(ProvenanceEventRecord record, FlowFile flowFile); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryPurgeException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryPurgeException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryPurgeException.java index 613baa8..e668be3 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryPurgeException.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryPurgeException.java @@ -20,38 +20,24 @@ package org.apache.nifi.controller.repository; * This exception is thrown when a flow file repository was unable to be * properly purged. * - * @author unattributed * */ public class RepositoryPurgeException extends RuntimeException { private static final long serialVersionUID = 1894237987230873423L; - /** - * Default constructor - */ public RepositoryPurgeException() { super(); } - /** - * @param message - */ public RepositoryPurgeException(String message) { super(message); } - /** - * @param cause - */ public RepositoryPurgeException(Throwable cause) { super(cause); } - /** - * @param message - * @param cause - */ public RepositoryPurgeException(String message, Throwable cause) { super(message, cause); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java index 6f222d9..fb3cdd2 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java @@ -67,7 +67,7 @@ public class RingBufferEventRepository implements FlowFileEventRepository { @Override public void purgeTransferEvents(final long cutoffEpochMilliseconds) { - // This is done so that if a processor is removed from the graph, its events + // This is done so that if a processor is removed from the graph, its events // will be removed rather than being kept in memory for (final EventContainer container : componentEventMap.values()) { container.purgeEvents(cutoffEpochMilliseconds); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java index d4c7e47..433c3d2 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java @@ -130,8 +130,8 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { * Provides the natural ordering for FlowFile objects which is based on * their identifier. * - * @param other - * @return + * @param other other + * @return standard compare contract */ @Override public int compareTo(final FlowFile other) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index e5cd03e..4827ab7 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -307,7 +307,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // Figure out which content claims can be released. // At this point, we will decrement the Claimant Count for the claims via the Content Repository. - // We do not actually destroy the content because otherwise, we could remove the + // We do not actually destroy the content because otherwise, we could remove the // Original Claim and crash/restart before the FlowFileRepository is updated. This will result in the FlowFile being restored such that // the content claim points to the Original Claim -- which has already been removed! for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) { @@ -490,14 +490,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private void addEventType(final Map<String, Set<ProvenanceEventType>> map, final String id, final ProvenanceEventType eventType) { Set<ProvenanceEventType> eventTypes = map.get(id); - if ( eventTypes == null ) { + if (eventTypes == null) { eventTypes = new HashSet<>(); map.put(id, eventTypes); } - + eventTypes.add(eventType); } - + private void updateProvenanceRepo(final Checkpoint checkpoint) { // Update Provenance Repository final ProvenanceEventRepository provenanceRepo = context.getProvenanceRepository(); @@ -507,7 +507,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // for this, so that we are able to ensure that the events are submitted in the proper order. final Set<ProvenanceEventRecord> recordsToSubmit = new LinkedHashSet<>(); final Map<String, Set<ProvenanceEventType>> eventTypesPerFlowFileId = new HashMap<>(); - + final Set<ProvenanceEventRecord> processorGenerated = checkpoint.reportedEvents; // We first want to submit FORK events because if the Processor is going to create events against @@ -524,11 +524,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles) && !processorGenerated.contains(event)) { recordsToSubmit.add(event); - - for ( final String childUuid : event.getChildUuids() ) { + + for (final String childUuid : event.getChildUuids()) { addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType()); } - for ( final String parentUuid : event.getParentUuids() ) { + for (final String parentUuid : event.getParentUuids()) { addEventType(eventTypesPerFlowFileId, parentUuid, event.getEventType()); } } @@ -539,14 +539,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) { continue; } - if ( isSpuriousRouteEvent(event, checkpoint.records) ) { + if (isSpuriousRouteEvent(event, checkpoint.records)) { continue; } - - // Check if the event indicates that the FlowFile was routed to the same + + // Check if the event indicates that the FlowFile was routed to the same // connection from which it was pulled (and only this connection). If so, discard the event. isSpuriousRouteEvent(event, checkpoint.records); - + recordsToSubmit.add(event); addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType()); } @@ -562,62 +562,62 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType()); } } - + // Check if content or attributes changed. If so, register the appropriate events. - for (final StandardRepositoryRecord repoRecord : checkpoint.records.values() ) { + for (final StandardRepositoryRecord repoRecord : checkpoint.records.values()) { final ContentClaim original = repoRecord.getOriginalClaim(); final ContentClaim current = repoRecord.getCurrentClaim(); - + boolean contentChanged = false; - if ( original == null && current != null ) { + if (original == null && current != null) { contentChanged = true; } - if ( original != null && current == null ) { + if (original != null && current == null) { contentChanged = true; } - if ( original != null && current != null && !original.equals(current) ) { + if (original != null && current != null && !original.equals(current)) { contentChanged = true; } - + final FlowFileRecord curFlowFile = repoRecord.getCurrent(); final String flowFileId = curFlowFile.getAttribute(CoreAttributes.UUID.key()); boolean eventAdded = false; - + if (checkpoint.removedFlowFiles.contains(flowFileId)) { continue; } - + final boolean newFlowFile = repoRecord.getOriginal() == null; - if ( contentChanged && !newFlowFile ) { + if (contentChanged && !newFlowFile) { recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build()); addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED); eventAdded = true; } - - if ( checkpoint.createdFlowFiles.contains(flowFileId) ) { + + if (checkpoint.createdFlowFiles.contains(flowFileId)) { final Set<ProvenanceEventType> registeredTypes = eventTypesPerFlowFileId.get(flowFileId); boolean creationEventRegistered = false; - if ( registeredTypes != null ) { - if ( registeredTypes.contains(ProvenanceEventType.CREATE) || - registeredTypes.contains(ProvenanceEventType.FORK) || - registeredTypes.contains(ProvenanceEventType.JOIN) || - registeredTypes.contains(ProvenanceEventType.RECEIVE) ) { + if (registeredTypes != null) { + if (registeredTypes.contains(ProvenanceEventType.CREATE) + || registeredTypes.contains(ProvenanceEventType.FORK) + || registeredTypes.contains(ProvenanceEventType.JOIN) + || registeredTypes.contains(ProvenanceEventType.RECEIVE)) { creationEventRegistered = true; } } - - if ( !creationEventRegistered ) { + + if (!creationEventRegistered) { recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CREATE).build()); eventAdded = true; } } - - if ( !eventAdded && !repoRecord.getUpdatedAttributes().isEmpty() ) { + + if (!eventAdded && !repoRecord.getUpdatedAttributes().isEmpty()) { // We generate an ATTRIBUTES_MODIFIED event only if no other event has been // created for the FlowFile. We do this because all events contain both the // newest and the original attributes, so generating an ATTRIBUTES_MODIFIED // event is redundant if another already exists. - if ( !eventTypesPerFlowFileId.containsKey(flowFileId) ) { + if (!eventTypesPerFlowFileId.containsKey(flowFileId)) { recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).build()); addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.ATTRIBUTES_MODIFIED); } @@ -689,8 +689,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (originalClaim == null) { builder.setCurrentContentClaim(null, null, null, null, 0L); } else { - builder.setCurrentContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()); - builder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()); + builder.setCurrentContentClaim( + originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize() + ); + builder.setPreviousContentClaim( + originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize() + ); } } @@ -725,7 +729,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return recordBuilder.build(); } - private StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) { + private StandardProvenanceEventRecord enrich( + final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) { final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent); final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid()); if (eventFlowFile != null) { @@ -770,8 +775,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE * when a Processor calls #create(FlowFile) and then removes the created * FlowFile. * - * @param event - * @return + * @param event event + * @return true if spurious fork */ private boolean isSpuriousForkEvent(final ProvenanceEventRecord event, final Set<String> removedFlowFiles) { if (event.getEventType() == ProvenanceEventType.FORK) { @@ -784,33 +789,33 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return false; } - /** - * Checks if the given event is a spurious ROUTE, meaning that the ROUTE indicates that a FlowFile - * was routed to a relationship with only 1 connection and that Connection is the Connection from which - * the FlowFile was pulled. I.e., the FlowFile was really routed nowhere. - * - * @param event - * @param records - * @return + * Checks if the given event is a spurious ROUTE, meaning that the ROUTE + * indicates that a FlowFile was routed to a relationship with only 1 + * connection and that Connection is the Connection from which the FlowFile + * was pulled. I.e., the FlowFile was really routed nowhere. + * + * @param event event + * @param records records + * @return true if spurious route */ private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, final Map<FlowFileRecord, StandardRepositoryRecord> records) { - if ( event.getEventType() == ProvenanceEventType.ROUTE ) { + if (event.getEventType() == ProvenanceEventType.ROUTE) { final String relationshipName = event.getRelationship(); final Relationship relationship = new Relationship.Builder().name(relationshipName).build(); final Collection<Connection> connectionsForRelationship = this.context.getConnections(relationship); - + // If the number of connections for this relationship is not 1, then we can't ignore this ROUTE event, // as it may be cloning the FlowFile and adding to multiple connections. - if ( connectionsForRelationship.size() == 1 ) { - for ( final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : records.entrySet() ) { + if (connectionsForRelationship.size() == 1) { + for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : records.entrySet()) { final FlowFileRecord flowFileRecord = entry.getKey(); - if ( event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key())) ) { + if (event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key()))) { final StandardRepositoryRecord repoRecord = entry.getValue(); - if ( repoRecord.getOriginalQueue() == null ) { + if (repoRecord.getOriginalQueue() == null) { return false; } - + final String originalQueueId = repoRecord.getOriginalQueue().getIdentifier(); final Connection destinationConnection = connectionsForRelationship.iterator().next(); final String destinationQueueId = destinationConnection.getFlowFileQueue().getIdentifier(); @@ -819,10 +824,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } } - + return false; } - + @Override public void rollback() { rollback(false); @@ -919,7 +924,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE /** * Destroys a ContentClaim that was being written to but is no longer needed * - * @param claim + * @param claim claim to destroy */ private void destroyContent(final ContentClaim claim) { if (claim == null) { @@ -997,7 +1002,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final int numCreated = checkpoint.createdFlowFiles.size(); final StringBuilder sb = new StringBuilder(512); - if (!LOG.isDebugEnabled() && (largestTransferSetSize > VERBOSE_LOG_THRESHOLD || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) { + if (!LOG.isDebugEnabled() && (largestTransferSetSize > VERBOSE_LOG_THRESHOLD + || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) { if (numCreated > 0) { sb.append("created ").append(numCreated).append(" FlowFiles, "); } @@ -1966,7 +1972,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE throw ffae; } catch (final IOException ioe) { if (appendToClaim) { - resetWriteClaims(); // need to reset write claim before we can remove the claim + resetWriteClaims(); // need to reset write claim before we can remove the claim } destroyContent(newClaim); throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe); @@ -1992,7 +1998,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final StandardRepositoryRecord record = records.get(source); long newSize = 0L; - // Get the current Content Claim from the record and see if we already have + // Get the current Content Claim from the record and see if we already have // an OutputStream that we can append to. ContentClaim oldClaim = record.getCurrentClaim(); ByteCountingOutputStream outStream = appendableStreams.get(oldClaim); @@ -2087,17 +2093,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE * since it was last committed to the FlowFile repository, because this * indicates that the content is no longer needed and should be cleaned up. * - * @param record + * @param record record */ private void removeTemporaryClaim(final StandardRepositoryRecord record) { final boolean contentModified = record.getWorkingClaim() != null && record.getWorkingClaim() != record.getOriginalClaim(); // If the working claim is not the same as the original claim, we have modified the content of - // the FlowFile, and we need to remove the newly created file (the working claim). However, if - // they are the same, we cannot just remove the claim because record.getWorkingClaim() will return - // the original claim if the record is "working" but the content has not been modified + // the FlowFile, and we need to remove the newly created file (the working claim). However, if + // they are the same, we cannot just remove the claim because record.getWorkingClaim() will return + // the original claim if the record is "working" but the content has not been modified // (e.g., in the case of attributes only were updated) - // // In other words: // If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will // return the same claim as record.getOriginalClaim(). So we cannot just remove the working claim because @@ -2150,10 +2155,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } /** - * Indicates whether or not multiple FlowFiles should be merged into a - * single ContentClaim - * - * @return + * @return Indicates whether or not multiple FlowFiles should be merged into + * a single ContentClaim */ private boolean isMergeContent() { if (writeRecursionLevel > 0) { @@ -2364,7 +2367,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE enforceCurrentWriteClaimState(); newClaim = currentWriteClaim; claimOffset = currentWriteClaimSize; - + final long bytesCopied = StreamUtils.copy(source, currentWriteClaimStream); bytesWritten.increment(bytesCopied); currentWriteClaimSize += bytesCopied; @@ -2373,7 +2376,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try { newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination); - + newSize = context.getContentRepository().importFrom(source, newClaim, appendToClaim); bytesWritten.increment(newSize); currentWriteClaimSize += newSize; @@ -2386,7 +2389,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE resetWriteClaims(); } - if ( newClaim != null ) { + if (newClaim != null) { destroyContent(newClaim); } throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java index 23d090c..5e8bb3e 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java @@ -56,7 +56,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter { /** * Removes the given event from the reporter * - * @param event + * @param event event */ void remove(final ProvenanceEventRecord event) { events.remove(event); @@ -72,9 +72,9 @@ public class StandardProvenanceReporter implements ProvenanceReporter { * ability to de-dupe events, since one or more events may be created by the * session itself, as well as by the Processor * - * @param parents - * @param child - * @return + * @param parents parents + * @param child child + * @return record */ ProvenanceEventRecord generateJoinEvent(final Collection<FlowFile> parents, final FlowFile child) { final ProvenanceEventBuilder eventBuilder = build(child, ProvenanceEventType.JOIN); @@ -114,7 +114,8 @@ public class StandardProvenanceReporter implements ProvenanceReporter { @Override public void receive(final FlowFile flowFile, final String transitUri, final String sourceSystemFlowFileIdentifier, final String details, final long transmissionMillis) { try { - final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE).setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build(); + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE) + .setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build(); events.add(record); } catch (final Exception e) { logger.error("Failed to generate Provenance Event due to " + e); @@ -160,7 +161,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter { final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build(); final ProvenanceEventRecord enriched = eventEnricher.enrich(record, flowFile); - + if (force) { repository.registerEvent(enriched); } else { @@ -326,7 +327,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter { } } } - + @Override public void modifyContent(final FlowFile flowFile) { modifyContent(flowFile, null, -1L); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java index 6ecb991..c965ed8 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java @@ -24,10 +24,6 @@ import org.apache.nifi.controller.FlowFileQueue; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.processor.Relationship; -/** - * - * @author - */ public class StandardRepositoryRecord implements RepositoryRecord { private RepositoryRecordType type = null; @@ -44,7 +40,7 @@ public class StandardRepositoryRecord implements RepositoryRecord { * Creates a new record which has no original claim or flow file - it is * entirely new * - * @param originalQueue + * @param originalQueue queue */ public StandardRepositoryRecord(final FlowFileQueue originalQueue) { this(originalQueue, null); @@ -54,8 +50,8 @@ public class StandardRepositoryRecord implements RepositoryRecord { /** * Creates a record based on given original items * - * @param originalQueue - * @param originalFlowFileRecord + * @param originalQueue queue + * @param originalFlowFileRecord record */ public StandardRepositoryRecord(final FlowFileQueue originalQueue, final FlowFileRecord originalFlowFileRecord) { this(originalQueue, originalFlowFileRecord, null); @@ -116,22 +112,22 @@ public class StandardRepositoryRecord implements RepositoryRecord { public void setWorking(final FlowFileRecord flowFile, final String attributeKey, final String attributeValue) { workingFlowFileRecord = flowFile; - + // If setting attribute to same value as original, don't add to updated attributes final String currentValue = originalAttributes.get(attributeKey); - if ( currentValue == null || !currentValue.equals(attributeValue) ) { - updatedAttributes.put(attributeKey, attributeValue); + if (currentValue == null || !currentValue.equals(attributeValue)) { + updatedAttributes.put(attributeKey, attributeValue); } } public void setWorking(final FlowFileRecord flowFile, final Map<String, String> updatedAttribs) { workingFlowFileRecord = flowFile; - - for ( final Map.Entry<String, String> entry : updatedAttribs.entrySet() ) { - final String currentValue = originalAttributes.get(entry.getKey()); - if ( currentValue == null || !currentValue.equals(entry.getValue()) ) { - updatedAttributes.put(entry.getKey(), entry.getValue()); - } + + for (final Map.Entry<String, String> entry : updatedAttribs.entrySet()) { + final String currentValue = originalAttributes.get(entry.getKey()); + if (currentValue == null || !currentValue.equals(entry.getValue())) { + updatedAttributes.put(entry.getKey(), entry.getValue()); + } } }
