NIFI-305: Slight refactorings to provide more flexibility in concrete 
implementations


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ec7f7e77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ec7f7e77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ec7f7e77

Branch: refs/heads/NIFI-250
Commit: ec7f7e7717750276a6cbf56edbfcca49b1299fa5
Parents: c01dff5
Author: Mark Payne <[email protected]>
Authored: Mon Feb 2 13:51:51 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Mon Feb 2 13:51:51 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/BinFiles.java      | 72 +++-----------------
 .../nifi/processors/standard/MergeContent.java  | 69 ++++++++++---------
 2 files changed, 46 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ec7f7e77/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
index 0a65c59..3d7dba1 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
@@ -19,11 +19,8 @@ package org.apache.nifi.processors.standard;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -39,7 +36,6 @@ import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -99,38 +95,9 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
     public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original").description("The FlowFiles that were 
used to create the bundle").build();
     public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("If the bundle cannot be 
created, all FlowFiles that would have been used to created the bundle will be 
transferred to failure").build();
 
-    private Set<Relationship> relationships;
-    private List<PropertyDescriptor> descriptors;
     private final BinManager binManager = new BinManager();
-
     private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
 
-    @Override
-    protected final void init(final ProcessorInitializationContext context) {
-
-       final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_ORIGINAL);
-        relationships.add(REL_FAILURE);
-        Set<Relationship> additionalRelationships = 
defineAdditionalRelationships();
-        if (additionalRelationships != null) {
-               relationships.addAll(additionalRelationships);
-        }
-        this.relationships = Collections.unmodifiableSet(relationships);
-
-        final List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(MIN_ENTRIES);
-        descriptors.add(MAX_ENTRIES);
-        descriptors.add(MIN_SIZE);
-        descriptors.add(MAX_SIZE);
-        descriptors.add(MAX_BIN_AGE);
-        descriptors.add(MAX_BIN_COUNT);
-        List<PropertyDescriptor> additionalPropertyDescriptors = 
this.defineAdditionalPropertyDescriptors();
-        if (additionalPropertyDescriptors != null) {
-               descriptors.addAll(additionalPropertyDescriptors);
-        }
-
-        this.descriptors = Collections.unmodifiableList(descriptors);
-    }
 
     @OnStopped
     public final void resetState() {
@@ -144,27 +111,6 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
         }
     }
 
-    @Override
-    public final Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected final List<PropertyDescriptor> getSupportedPropertyDescriptors() 
{
-        return descriptors;
-    }
-
-    /**
-     * Allows any additional relationships to be defined.
-     * @return Relationships to be added in the init() method
-     */ 
-    protected abstract Set<Relationship> defineAdditionalRelationships();
-    
-    /**
-     * Allows any additional property descriptors to be defined.
-     * @return Properties to be added in the init() method
-     */
-    protected abstract List<PropertyDescriptor> 
defineAdditionalPropertyDescriptors();
 
        /**
         * Allows general pre-processing of a flow file before it is offered to 
a
@@ -213,14 +159,14 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
         * to Failure and commit their sessions.  If false, the 
         * processBins() method will transfer the files to Original and commit
         * the sessions
-        * @throws Exception
-        *             This will be handled appropriately, and all flow files 
in the
-        *             bin will be transferred to failure and the session rolled
-        *             back
+        * 
+        * @throws ProcessException if any problem arises while processing a bin
+        *             of FlowFiles. All flow files in the
+        *             bin will be transferred to failure and the 
ProcessSession provided by
+        *             the 'session' argument rolled back
         */
-       protected abstract boolean processBin(Bin unmodifiableBin,
-                       List<FlowFileSessionWrapper> binContents, 
ProcessContext context,
-                       ProcessSession session, ProcessorLog logger) throws 
Exception;
+       protected abstract boolean processBin(Bin unmodifiableBin, 
+               List<FlowFileSessionWrapper> binContents, ProcessContext 
context, ProcessSession session) throws ProcessException;
 
     /**
         * Allows additional custom validation to be done. This will be called 
from
@@ -288,8 +234,8 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
 
        boolean binAlreadyCommitted = false;
         try {
-               binAlreadyCommitted = this.processBin(bin, binCopy, context, 
session, logger);
-        } catch (final Exception e) {
+               binAlreadyCommitted = this.processBin(bin, binCopy, context, 
session);
+        } catch (final ProcessException e) {
             logger.error("Failed to process bundle of {} files due to {}", new 
Object[]{binCopy.size(), e});
 
             for (final FlowFileSessionWrapper wrapper : binCopy) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ec7f7e77/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 73cb5a6..a78bc07 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -46,10 +46,10 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -198,56 +198,62 @@ public class MergeContent extends BinFiles {
 
     public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
 
+
        @Override
-       protected Set<Relationship> defineAdditionalRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
+       public Set<Relationship> getRelationships() {
+           final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_FAILURE);
         relationships.add(REL_MERGED);
-        
         return relationships;
-    }
-
+       }
+       
+       
        @Override
-       protected List<PropertyDescriptor> 
defineAdditionalPropertyDescriptors() {
-               final List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(MERGE_STRATEGY);
-        descriptors.add(MERGE_FORMAT);
-        descriptors.add(ATTRIBUTE_STRATEGY);
-        descriptors.add(CORRELATION_ATTRIBUTE_NAME);
+       protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+           final List<PropertyDescriptor> descriptors = new ArrayList<>();
+           descriptors.add(MERGE_STRATEGY);
+           descriptors.add(MERGE_FORMAT);
+           descriptors.add(ATTRIBUTE_STRATEGY);
+           descriptors.add(CORRELATION_ATTRIBUTE_NAME);
+        descriptors.add(MIN_ENTRIES);
+        descriptors.add(MAX_ENTRIES);
+        descriptors.add(MIN_SIZE);
+        descriptors.add(MAX_SIZE);
+        descriptors.add(MAX_BIN_AGE);
+        descriptors.add(MAX_BIN_COUNT);
         descriptors.add(HEADER);
         descriptors.add(FOOTER);
         descriptors.add(DEMARCATOR);
         descriptors.add(COMPRESSION_LEVEL);
         descriptors.add(KEEP_PATH);
-        
         return descriptors;
        }
-
+       
     private byte[] readContent(final String filename) throws IOException {
         return Files.readAllBytes(Paths.get(filename));
     }
 
 
        @Override
-       protected FlowFile preprocessFlowFile(ProcessContext context,
-                       ProcessSession session, FlowFile flowFile) {
-               
+       protected FlowFile preprocessFlowFile(final ProcessContext context, 
final ProcessSession session, final FlowFile flowFile) {
+           FlowFile processed = flowFile;
         // handle backward compatibility with old segment attributes
-        if (flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && 
flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
-            flowFile = session.putAttribute(flowFile, 
FRAGMENT_COUNT_ATTRIBUTE, flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
+        if (processed.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && 
processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
+            processed = session.putAttribute(processed, 
FRAGMENT_COUNT_ATTRIBUTE, processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
         }
-        if (flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && 
flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
-            flowFile = session.putAttribute(flowFile, 
FRAGMENT_INDEX_ATTRIBUTE, flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
+        if (processed.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && 
processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
+            processed = session.putAttribute(processed, 
FRAGMENT_INDEX_ATTRIBUTE, processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
         }
-        if (flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && 
flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
-            flowFile = session.putAttribute(flowFile, FRAGMENT_ID_ATTRIBUTE, 
flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE));
+        if (processed.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && 
processed.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
+            processed = session.putAttribute(processed, FRAGMENT_ID_ATTRIBUTE, 
processed.getAttribute(SEGMENT_ID_ATTRIBUTE));
         }
         
-        return flowFile;
+        return processed;
        }
 
        @Override
-       protected String getGroupId(ProcessContext context, FlowFile flowFile) {
-
+       protected String getGroupId(final ProcessContext context, final 
FlowFile flowFile) {
         final String correlationAttributeName = 
context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
         String groupId = (correlationAttributeName == null) ? null : 
flowFile.getAttribute(correlationAttributeName);
 
@@ -260,16 +266,15 @@ public class MergeContent extends BinFiles {
        }
 
        @Override
-       protected void setUpBinManager(BinManager binManager, ProcessContext 
context) {
+       protected void setUpBinManager(final BinManager binManager, final 
ProcessContext context) {
                if 
(MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue()))
 {
             binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
         }
        }
 
        @Override
-       protected boolean processBin(Bin unmodifiableBin,
-                       List<FlowFileSessionWrapper> binCopy, ProcessContext 
context,
-                       ProcessSession session, ProcessorLog logger) throws 
Exception {
+       protected boolean processBin(final Bin unmodifiableBin, final 
List<FlowFileSessionWrapper> binCopy, final ProcessContext context,
+                       final ProcessSession session) throws ProcessException {
 
         final String mergeFormat = 
context.getProperty(MERGE_FORMAT).getValue();
         MergeBin merger;
@@ -314,7 +319,7 @@ public class MergeContent extends BinFiles {
             // Fail the flow files and commit them
             if (error != null) {
                 final String binDescription = binCopy.size() <= 10 ? 
binCopy.toString() : binCopy.size() + " FlowFiles";
-                logger.error(error + "; routing {} to failure", new 
Object[]{binDescription});
+                getLogger().error(error + "; routing {} to failure", new 
Object[]{binDescription});
                 for ( final FlowFileSessionWrapper wrapper : binCopy ) {
                     wrapper.getSession().transfer(wrapper.getFlowFile(), 
REL_FAILURE);
                     wrapper.getSession().commit();
@@ -341,7 +346,7 @@ public class MergeContent extends BinFiles {
         bundle = session.putAllAttributes(bundle, bundleAttributes);
 
         final String inputDescription = (binCopy.size() < 10) ? 
binCopy.toString() : binCopy.size() + " FlowFiles";
-        logger.info("Merged {} into {}", new Object[]{inputDescription, 
bundle});
+        getLogger().info("Merged {} into {}", new Object[]{inputDescription, 
bundle});
         session.transfer(bundle, REL_MERGED);
 
         // We haven't committed anything, parent will take care of it

Reply via email to