[NIFI-784] Per review feedback, updating to use OnStopped insted of OnUnscheduled to avoid concurrency issue updating propertyMap. Also removed this from logger statements since it is already included
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/85079020 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/85079020 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/85079020 Branch: refs/heads/develop Commit: 85079020967918a5afeb4eba8c9e6e99766a1718 Parents: 4a43e81 Author: Brian Ghigiarelli <[email protected]> Authored: Fri Jul 24 15:26:22 2015 -0400 Committer: Brian Ghigiarelli <[email protected]> Committed: Fri Jul 24 15:26:22 2015 -0400 ---------------------------------------------------------------------- .../processors/standard/RouteOnAttribute.java | 24 ++++++++++++-------- 1 file changed, 15 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/85079020/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java index 52dc80b..a9be2e7 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java @@ -34,7 +34,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -105,7 +105,12 @@ public class RouteOnAttribute extends AbstractProcessor { private volatile String configuredRouteStrategy = ROUTE_STRATEGY.getDefaultValue(); private volatile Set<String> dynamicPropertyNames = new HashSet<>(); - private final Map<Relationship, PropertyValue> propertyMap = new HashMap<>(); + /** + * Cache of dynamic properties set during {@link #onScheduled(ProcessContext)} and + * cleared during {@link #onStopped(ProcessContext)} for quick access in + * {@link #onTrigger(ProcessContext, ProcessSession)} + */ + private volatile Map<Relationship, PropertyValue> propertyMap = new HashMap<>(); @Override protected void init(final ProcessorInitializationContext context) { @@ -171,8 +176,9 @@ public class RouteOnAttribute extends AbstractProcessor { } /** - * When this processor is - * @param context + * When this processor is scheduled, update the dynamic properties into the map + * for quick access during each onTrigger call + * @param context ProcessContext used to retrieve dynamic properties */ @OnScheduled public void onScheduled(final ProcessContext context) { @@ -184,9 +190,9 @@ public class RouteOnAttribute extends AbstractProcessor { propertyMap.put(new Relationship.Builder().name(descriptor.getName()).build(), context.getProperty(descriptor)); } } - - @OnUnscheduled - public void onUnscheduled(final ProcessContext context) { + + @OnStopped + public void onStopped() { getLogger().debug("Clearing propertyMap"); propertyMap.clear(); } @@ -233,7 +239,7 @@ public class RouteOnAttribute extends AbstractProcessor { } if (destinationRelationships.isEmpty()) { - logger.info("{} routing {} to unmatched", new Object[]{ this, flowFile }); + logger.info("Routing {} to unmatched", new Object[]{ flowFile }); flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, REL_NO_MATCH.getName()); session.getProvenanceReporter().route(flowFile, REL_NO_MATCH); session.transfer(flowFile, REL_NO_MATCH); @@ -253,7 +259,7 @@ public class RouteOnAttribute extends AbstractProcessor { // now transfer any clones generated for (final Map.Entry<Relationship, FlowFile> entry : transferMap.entrySet()) { - logger.info("{} cloned {} into {} and routing clone to relationship {}", new Object[]{ this, flowFile, entry.getValue(), entry.getKey() }); + logger.info("Cloned {} into {} and routing clone to relationship {}", new Object[]{ flowFile, entry.getValue(), entry.getKey() }); FlowFile updatedFlowFile = session.putAttribute(entry.getValue(), ROUTE_ATTRIBUTE_KEY, entry.getKey().getName()); session.getProvenanceReporter().route(updatedFlowFile, entry.getKey()); session.transfer(updatedFlowFile, entry.getKey());
