[NIFI-784] Per review feedback, updating to use OnStopped insted of 
OnUnscheduled to avoid concurrency issue updating propertyMap. Also removed 
this from logger statements since it is already included


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/85079020
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/85079020
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/85079020

Branch: refs/heads/develop
Commit: 85079020967918a5afeb4eba8c9e6e99766a1718
Parents: 4a43e81
Author: Brian Ghigiarelli <[email protected]>
Authored: Fri Jul 24 15:26:22 2015 -0400
Committer: Brian Ghigiarelli <[email protected]>
Committed: Fri Jul 24 15:26:22 2015 -0400

----------------------------------------------------------------------
 .../processors/standard/RouteOnAttribute.java   | 24 ++++++++++++--------
 1 file changed, 15 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/85079020/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
index 52dc80b..a9be2e7 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
@@ -34,7 +34,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
@@ -105,7 +105,12 @@ public class RouteOnAttribute extends AbstractProcessor {
     private volatile String configuredRouteStrategy = 
ROUTE_STRATEGY.getDefaultValue();
     private volatile Set<String> dynamicPropertyNames = new HashSet<>();
 
-    private final Map<Relationship, PropertyValue> propertyMap = new 
HashMap<>();
+    /**
+     * Cache of dynamic properties set during {@link 
#onScheduled(ProcessContext)} and
+     * cleared during {@link #onStopped(ProcessContext)} for quick access in
+     * {@link #onTrigger(ProcessContext, ProcessSession)}
+     */
+    private volatile Map<Relationship, PropertyValue> propertyMap = new 
HashMap<>();
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -171,8 +176,9 @@ public class RouteOnAttribute extends AbstractProcessor {
     }
 
     /**
-     * When this processor is 
-     * @param context
+     * When this processor is scheduled, update the dynamic properties into 
the map
+     * for quick access during each onTrigger call
+     * @param context ProcessContext used to retrieve dynamic properties
      */
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
@@ -184,9 +190,9 @@ public class RouteOnAttribute extends AbstractProcessor {
             propertyMap.put(new 
Relationship.Builder().name(descriptor.getName()).build(), 
context.getProperty(descriptor));
         }
     }
-    
-    @OnUnscheduled
-    public void onUnscheduled(final ProcessContext context) {
+
+    @OnStopped
+    public void onStopped() {
        getLogger().debug("Clearing propertyMap");
        propertyMap.clear();
     }
@@ -233,7 +239,7 @@ public class RouteOnAttribute extends AbstractProcessor {
         }
 
         if (destinationRelationships.isEmpty()) {
-            logger.info("{} routing {} to unmatched", new Object[]{ this, 
flowFile });
+            logger.info("Routing {} to unmatched", new Object[]{ flowFile });
             flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, 
REL_NO_MATCH.getName());
             session.getProvenanceReporter().route(flowFile, REL_NO_MATCH);
             session.transfer(flowFile, REL_NO_MATCH);
@@ -253,7 +259,7 @@ public class RouteOnAttribute extends AbstractProcessor {
 
             // now transfer any clones generated
             for (final Map.Entry<Relationship, FlowFile> entry : 
transferMap.entrySet()) {
-                logger.info("{} cloned {} into {} and routing clone to 
relationship {}", new Object[]{ this, flowFile, entry.getValue(), 
entry.getKey() });
+                logger.info("Cloned {} into {} and routing clone to 
relationship {}", new Object[]{ flowFile, entry.getValue(), entry.getKey() });
                 FlowFile updatedFlowFile = 
session.putAttribute(entry.getValue(), ROUTE_ATTRIBUTE_KEY, 
entry.getKey().getName());
                 session.getProvenanceReporter().route(updatedFlowFile, 
entry.getKey());
                 session.transfer(updatedFlowFile, entry.getKey());

Reply via email to