NIFI-305: Slight refactorings to provide more flexibility in concrete implementations
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ec7f7e77 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ec7f7e77 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ec7f7e77 Branch: refs/heads/NIFI-250 Commit: ec7f7e7717750276a6cbf56edbfcca49b1299fa5 Parents: c01dff5 Author: Mark Payne <[email protected]> Authored: Mon Feb 2 13:51:51 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Mon Feb 2 13:51:51 2015 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/BinFiles.java | 72 +++----------------- .../nifi/processors/standard/MergeContent.java | 69 ++++++++++--------- 2 files changed, 46 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ec7f7e77/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java index 0a65c59..3d7dba1 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java @@ -19,11 +19,8 @@ package org.apache.nifi.processors.standard; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Queue; -import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -39,7 +36,6 @@ import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; @@ -99,38 +95,9 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The FlowFiles that were used to create the bundle").build(); public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure").build(); - private Set<Relationship> relationships; - private List<PropertyDescriptor> descriptors; private final BinManager binManager = new BinManager(); - private final Queue<Bin> readyBins = new LinkedBlockingQueue<>(); - @Override - protected final void init(final ProcessorInitializationContext context) { - - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_ORIGINAL); - relationships.add(REL_FAILURE); - Set<Relationship> additionalRelationships = defineAdditionalRelationships(); - if (additionalRelationships != null) { - relationships.addAll(additionalRelationships); - } - this.relationships = Collections.unmodifiableSet(relationships); - - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(MIN_ENTRIES); - descriptors.add(MAX_ENTRIES); - descriptors.add(MIN_SIZE); - descriptors.add(MAX_SIZE); - descriptors.add(MAX_BIN_AGE); - descriptors.add(MAX_BIN_COUNT); - List<PropertyDescriptor> additionalPropertyDescriptors = this.defineAdditionalPropertyDescriptors(); - if (additionalPropertyDescriptors != null) { - descriptors.addAll(additionalPropertyDescriptors); - } - - this.descriptors = Collections.unmodifiableList(descriptors); - } @OnStopped public final void resetState() { @@ -144,27 +111,6 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { } } - @Override - public final Set<Relationship> getRelationships() { - return relationships; - } - - @Override - protected final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return descriptors; - } - - /** - * Allows any additional relationships to be defined. - * @return Relationships to be added in the init() method - */ - protected abstract Set<Relationship> defineAdditionalRelationships(); - - /** - * Allows any additional property descriptors to be defined. - * @return Properties to be added in the init() method - */ - protected abstract List<PropertyDescriptor> defineAdditionalPropertyDescriptors(); /** * Allows general pre-processing of a flow file before it is offered to a @@ -213,14 +159,14 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { * to Failure and commit their sessions. If false, the * processBins() method will transfer the files to Original and commit * the sessions - * @throws Exception - * This will be handled appropriately, and all flow files in the - * bin will be transferred to failure and the session rolled - * back + * + * @throws ProcessException if any problem arises while processing a bin + * of FlowFiles. All flow files in the + * bin will be transferred to failure and the ProcessSession provided by + * the 'session' argument rolled back */ - protected abstract boolean processBin(Bin unmodifiableBin, - List<FlowFileSessionWrapper> binContents, ProcessContext context, - ProcessSession session, ProcessorLog logger) throws Exception; + protected abstract boolean processBin(Bin unmodifiableBin, + List<FlowFileSessionWrapper> binContents, ProcessContext context, ProcessSession session) throws ProcessException; /** * Allows additional custom validation to be done. This will be called from @@ -288,8 +234,8 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { boolean binAlreadyCommitted = false; try { - binAlreadyCommitted = this.processBin(bin, binCopy, context, session, logger); - } catch (final Exception e) { + binAlreadyCommitted = this.processBin(bin, binCopy, context, session); + } catch (final ProcessException e) { logger.error("Failed to process bundle of {} files due to {}", new Object[]{binCopy.size(), e}); for (final FlowFileSessionWrapper wrapper : binCopy) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ec7f7e77/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 73cb5a6..a78bc07 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -46,10 +46,10 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; @@ -198,56 +198,62 @@ public class MergeContent extends BinFiles { public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); + @Override - protected Set<Relationship> defineAdditionalRelationships() { - final Set<Relationship> relationships = new HashSet<>(); + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_ORIGINAL); + relationships.add(REL_FAILURE); relationships.add(REL_MERGED); - return relationships; - } - + } + + @Override - protected List<PropertyDescriptor> defineAdditionalPropertyDescriptors() { - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(MERGE_STRATEGY); - descriptors.add(MERGE_FORMAT); - descriptors.add(ATTRIBUTE_STRATEGY); - descriptors.add(CORRELATION_ATTRIBUTE_NAME); + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(MERGE_STRATEGY); + descriptors.add(MERGE_FORMAT); + descriptors.add(ATTRIBUTE_STRATEGY); + descriptors.add(CORRELATION_ATTRIBUTE_NAME); + descriptors.add(MIN_ENTRIES); + descriptors.add(MAX_ENTRIES); + descriptors.add(MIN_SIZE); + descriptors.add(MAX_SIZE); + descriptors.add(MAX_BIN_AGE); + descriptors.add(MAX_BIN_COUNT); descriptors.add(HEADER); descriptors.add(FOOTER); descriptors.add(DEMARCATOR); descriptors.add(COMPRESSION_LEVEL); descriptors.add(KEEP_PATH); - return descriptors; } - + private byte[] readContent(final String filename) throws IOException { return Files.readAllBytes(Paths.get(filename)); } @Override - protected FlowFile preprocessFlowFile(ProcessContext context, - ProcessSession session, FlowFile flowFile) { - + protected FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) { + FlowFile processed = flowFile; // handle backward compatibility with old segment attributes - if (flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) { - flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT_ATTRIBUTE, flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE)); + if (processed.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) { + processed = session.putAttribute(processed, FRAGMENT_COUNT_ATTRIBUTE, processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE)); } - if (flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) { - flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX_ATTRIBUTE, flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE)); + if (processed.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) { + processed = session.putAttribute(processed, FRAGMENT_INDEX_ATTRIBUTE, processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE)); } - if (flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) { - flowFile = session.putAttribute(flowFile, FRAGMENT_ID_ATTRIBUTE, flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE)); + if (processed.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) { + processed = session.putAttribute(processed, FRAGMENT_ID_ATTRIBUTE, processed.getAttribute(SEGMENT_ID_ATTRIBUTE)); } - return flowFile; + return processed; } @Override - protected String getGroupId(ProcessContext context, FlowFile flowFile) { - + protected String getGroupId(final ProcessContext context, final FlowFile flowFile) { final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue(); String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName); @@ -260,16 +266,15 @@ public class MergeContent extends BinFiles { } @Override - protected void setUpBinManager(BinManager binManager, ProcessContext context) { + protected void setUpBinManager(final BinManager binManager, final ProcessContext context) { if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) { binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE); } } @Override - protected boolean processBin(Bin unmodifiableBin, - List<FlowFileSessionWrapper> binCopy, ProcessContext context, - ProcessSession session, ProcessorLog logger) throws Exception { + protected boolean processBin(final Bin unmodifiableBin, final List<FlowFileSessionWrapper> binCopy, final ProcessContext context, + final ProcessSession session) throws ProcessException { final String mergeFormat = context.getProperty(MERGE_FORMAT).getValue(); MergeBin merger; @@ -314,7 +319,7 @@ public class MergeContent extends BinFiles { // Fail the flow files and commit them if (error != null) { final String binDescription = binCopy.size() <= 10 ? binCopy.toString() : binCopy.size() + " FlowFiles"; - logger.error(error + "; routing {} to failure", new Object[]{binDescription}); + getLogger().error(error + "; routing {} to failure", new Object[]{binDescription}); for ( final FlowFileSessionWrapper wrapper : binCopy ) { wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE); wrapper.getSession().commit(); @@ -341,7 +346,7 @@ public class MergeContent extends BinFiles { bundle = session.putAllAttributes(bundle, bundleAttributes); final String inputDescription = (binCopy.size() < 10) ? binCopy.toString() : binCopy.size() + " FlowFiles"; - logger.info("Merged {} into {}", new Object[]{inputDescription, bundle}); + getLogger().info("Merged {} into {}", new Object[]{inputDescription, bundle}); session.transfer(bundle, REL_MERGED); // We haven't committed anything, parent will take care of it
