http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnContent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnContent.java index 44d090b..8f1eb4e 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnContent.java @@ -60,9 +60,9 @@ import org.apache.nifi.util.IntegerHolder; + "of the property is the name of the relationship and the value is a Regular Expression to match against the FlowFile " + "content. User-Defined properties do support the Attribute Expression Language, but the results are interpreted as " + "literal values, not Regular Expressions") -@DynamicProperty(name="Relationship Name", value="A Regular Expression", supportsExpressionLanguage=true, description="Routes FlowFiles whose " + - "content matches the regular expressoin defined by Dynamic Property's value to the Relationship defined by the Dynamic Property's key") -@DynamicRelationship(name="Name from Dynamic Property", description="FlowFiles that match the Dynamic Property's Regular Expression") +@DynamicProperty(name = "Relationship Name", value = "A Regular Expression", supportsExpressionLanguage = true, description = "Routes FlowFiles whose " + + "content matches the regular expressoin defined by Dynamic Property's value to the Relationship defined by the Dynamic Property's key") +@DynamicRelationship(name = "Name from Dynamic Property", description = "FlowFiles that match the Dynamic Property's Regular Expression") public class RouteOnContent extends AbstractProcessor { public static final String ROUTE_ATTRIBUTE_KEY = "RouteOnContent.Route"; @@ -70,30 +70,34 @@ public class RouteOnContent extends AbstractProcessor { public static final String MATCH_ALL = "content must match exactly"; public static final String MATCH_SUBSEQUENCE = "content must contain match"; - public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Content Buffer Size") - .description("Specifies the maximum amount of data to buffer in order to apply the regular expressions. If the size of the FlowFile exceeds this value, any amount of this value will be ignored") - .required(true) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("1 MB") - .build(); - public static final PropertyDescriptor MATCH_REQUIREMENT = new PropertyDescriptor.Builder() - .name("Match Requirement") - .description("Specifies whether the entire content of the file must match the regular expression exactly, or if any part of the file (up to Content Buffer Size) can contain the regular expression in order to be considered a match") - .required(true) - .allowableValues(MATCH_ALL, MATCH_SUBSEQUENCE) - .defaultValue(MATCH_ALL) - .build(); - public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() - .name("Character Set") - .description("The Character Set in which the file is encoded") - .required(true) - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .defaultValue("UTF-8") - .build(); - - public static final Relationship REL_NO_MATCH = new Relationship.Builder().name("unmatched") - .description("FlowFiles that do not match any of the user-supplied regular expressions will be routed to this relationship").build(); + public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder(). + name("Content Buffer Size"). + description("Specifies the maximum amount of data to buffer in order to apply the regular expressions. If the size of the FlowFile " + + "exceeds this value, any amount of this value will be ignored"). + required(true). + addValidator(StandardValidators.DATA_SIZE_VALIDATOR). + defaultValue("1 MB"). + build(); + public static final PropertyDescriptor MATCH_REQUIREMENT = new PropertyDescriptor.Builder(). + name("Match Requirement"). + description("Specifies whether the entire content of the file must match the regular expression exactly, or if any part of the file " + + "(up to Content Buffer Size) can contain the regular expression in order to be considered a match"). + required(true). + allowableValues(MATCH_ALL, MATCH_SUBSEQUENCE). + defaultValue(MATCH_ALL). + build(); + public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder(). + name("Character Set"). + description("The Character Set in which the file is encoded"). + required(true). + addValidator(StandardValidators.CHARACTER_SET_VALIDATOR). + defaultValue("UTF-8"). + build(); + + public static final Relationship REL_NO_MATCH = new Relationship.Builder(). + name("unmatched"). + description("FlowFiles that do not match any of the user-supplied regular expressions will be routed to this relationship"). + build(); private final AtomicReference<Set<Relationship>> relationships = new AtomicReference<>(); private List<PropertyDescriptor> properties; @@ -128,19 +132,23 @@ public class RouteOnContent extends AbstractProcessor { } return new PropertyDescriptor.Builder() - .required(false) - .name(propertyDescriptorName) - .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true)) - .dynamic(true) - .expressionLanguageSupported(true) - .build(); + .required(false). + name(propertyDescriptorName). + addValidator(StandardValidators. + createRegexValidator(0, Integer.MAX_VALUE, true)). + dynamic(true). + expressionLanguageSupported(true). + build(); } @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { if (descriptor.isDynamic()) { - final Set<Relationship> relationships = new HashSet<>(this.relationships.get()); - final Relationship relationship = new Relationship.Builder().name(descriptor.getName()).build(); + final Set<Relationship> relationships = new HashSet<>(this.relationships. + get()); + final Relationship relationship = new Relationship.Builder(). + name(descriptor.getName()). + build(); if (newValue == null) { relationships.remove(relationship); @@ -162,15 +170,20 @@ public class RouteOnContent extends AbstractProcessor { final AttributeValueDecorator quoteDecorator = new AttributeValueDecorator() { @Override public String decorate(final String attributeValue) { - return (attributeValue == null) ? null : Pattern.quote(attributeValue); + return (attributeValue == null) ? null : Pattern. + quote(attributeValue); } }; final Map<FlowFile, Set<Relationship>> flowFileDestinationMap = new HashMap<>(); final ProcessorLog logger = getLogger(); - final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); - final byte[] buffer = new byte[context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B).intValue()]; + final Charset charset = Charset.forName(context. + getProperty(CHARACTER_SET). + getValue()); + final byte[] buffer = new byte[context.getProperty(BUFFER_SIZE). + asDataSize(DataUnit.B). + intValue()]; for (final FlowFile flowFile : flowFiles) { final Set<Relationship> destinations = new HashSet<>(); flowFileDestinationMap.put(flowFile, destinations); @@ -179,58 +192,82 @@ public class RouteOnContent extends AbstractProcessor { session.read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { - bufferedByteCount.set(StreamUtils.fillBuffer(in, buffer, false)); + bufferedByteCount.set(StreamUtils. + fillBuffer(in, buffer, false)); } }); - final String contentString = new String(buffer, 0, bufferedByteCount.get(), charset); + final String contentString = new String(buffer, 0, bufferedByteCount. + get(), charset); - for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + for (final PropertyDescriptor descriptor : context.getProperties(). + keySet()) { if (!descriptor.isDynamic()) { continue; } - final String regex = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile, quoteDecorator).getValue(); + final String regex = context.getProperty(descriptor). + evaluateAttributeExpressions(flowFile, quoteDecorator). + getValue(); final Pattern pattern = Pattern.compile(regex); final boolean matches; - if (context.getProperty(MATCH_REQUIREMENT).getValue().equalsIgnoreCase(MATCH_ALL)) { - matches = pattern.matcher(contentString).matches(); + if (context.getProperty(MATCH_REQUIREMENT). + getValue(). + equalsIgnoreCase(MATCH_ALL)) { + matches = pattern.matcher(contentString). + matches(); } else { - matches = pattern.matcher(contentString).find(); + matches = pattern.matcher(contentString). + find(); } if (matches) { - final Relationship relationship = new Relationship.Builder().name(descriptor.getName()).build(); + final Relationship relationship = new Relationship.Builder(). + name(descriptor.getName()). + build(); destinations.add(relationship); } } } - for (final Map.Entry<FlowFile, Set<Relationship>> entry : flowFileDestinationMap.entrySet()) { + for (final Map.Entry<FlowFile, Set<Relationship>> entry : flowFileDestinationMap. + entrySet()) { FlowFile flowFile = entry.getKey(); final Set<Relationship> destinations = entry.getValue(); if (destinations.isEmpty()) { - flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, REL_NO_MATCH.getName()); + flowFile = session. + putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, REL_NO_MATCH. + getName()); session.transfer(flowFile, REL_NO_MATCH); - session.getProvenanceReporter().route(flowFile, REL_NO_MATCH); + session.getProvenanceReporter(). + route(flowFile, REL_NO_MATCH); logger.info("Routing {} to 'unmatched'", new Object[]{flowFile}); } else { - final Relationship firstRelationship = destinations.iterator().next(); + final Relationship firstRelationship = destinations.iterator(). + next(); destinations.remove(firstRelationship); for (final Relationship relationship : destinations) { FlowFile clone = session.clone(flowFile); - clone = session.putAttribute(clone, ROUTE_ATTRIBUTE_KEY, relationship.getName()); - session.getProvenanceReporter().route(clone, relationship); + clone = session. + putAttribute(clone, ROUTE_ATTRIBUTE_KEY, relationship. + getName()); + session.getProvenanceReporter(). + route(clone, relationship); session.transfer(clone, relationship); - logger.info("Cloning {} to {} and routing clone to {}", new Object[]{flowFile, clone, relationship}); + logger. + info("Cloning {} to {} and routing clone to {}", new Object[]{flowFile, clone, relationship}); } - flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, firstRelationship.getName()); - session.getProvenanceReporter().route(flowFile, firstRelationship); + flowFile = session. + putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, firstRelationship. + getName()); + session.getProvenanceReporter(). + route(flowFile, firstRelationship); session.transfer(flowFile, firstRelationship); - logger.info("Routing {} to {}", new Object[]{flowFile, firstRelationship}); + logger. + info("Routing {} to {}", new Object[]{flowFile, firstRelationship}); } } }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java index 6d48d02..53ed961 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java @@ -62,35 +62,35 @@ public class ScanAttribute extends AbstractProcessor { public static final String MATCH_CRITERIA_ALL = "All Must Match"; public static final String MATCH_CRITERIA_ANY = "At Least 1 Must Match"; - public static final PropertyDescriptor MATCHING_CRITERIA = new PropertyDescriptor.Builder() - .name("Match Criteria") - .description("If set to All Must Match, then FlowFiles will be routed to 'matched' only if all specified " + public static final PropertyDescriptor MATCHING_CRITERIA = new PropertyDescriptor.Builder(). + name("Match Criteria"). + description("If set to All Must Match, then FlowFiles will be routed to 'matched' only if all specified " + "attributes' values are found in the dictionary. If set to At Least 1 Must Match, FlowFiles will " - + "be routed to 'matched' if any attribute specified is found in the dictionary") - .required(true) - .allowableValues(MATCH_CRITERIA_ANY, MATCH_CRITERIA_ALL) - .defaultValue(MATCH_CRITERIA_ANY) - .build(); - public static final PropertyDescriptor ATTRIBUTE_PATTERN = new PropertyDescriptor.Builder() - .name("Attribute Pattern") - .description("Regular Expression that specifies the names of attributes whose values will be matched against the terms in the dictionary") - .required(true) - .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .defaultValue(".*") - .build(); - public static final PropertyDescriptor DICTIONARY_FILE = new PropertyDescriptor.Builder() - .name("Dictionary File") - .description("A new-line-delimited text file that includes the terms that should trigger a match. Empty lines are ignored.") - .required(true) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .build(); - public static final PropertyDescriptor DICTIONARY_FILTER = new PropertyDescriptor.Builder() - .name("Dictionary Filter Pattern") - .description("A Regular Expression that will be applied to each line in the dictionary file. If the regular expression does not match the line, the line will not be included in the list of terms to search for. If a Matching Group is specified, only the portion of the term that matches that Matching Group will be used instead of the entire term. If not specified, all terms in the dictionary will be used and each term will consist of the text of the entire line in the file") - .required(false) - .addValidator(StandardValidators.createRegexValidator(0, 1, false)) - .defaultValue(null) - .build(); + + "be routed to 'matched' if any attribute specified is found in the dictionary"). + required(true). + allowableValues(MATCH_CRITERIA_ANY, MATCH_CRITERIA_ALL). + defaultValue(MATCH_CRITERIA_ANY). + build(); + public static final PropertyDescriptor ATTRIBUTE_PATTERN = new PropertyDescriptor.Builder(). + name("Attribute Pattern"). + description("Regular Expression that specifies the names of attributes whose values will be matched against the terms in the dictionary"). + required(true). + addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR). + defaultValue(".*"). + build(); + public static final PropertyDescriptor DICTIONARY_FILE = new PropertyDescriptor.Builder(). + name("Dictionary File"). + description("A new-line-delimited text file that includes the terms that should trigger a match. Empty lines are ignored."). + required(true). + addValidator(StandardValidators.FILE_EXISTS_VALIDATOR). + build(); + public static final PropertyDescriptor DICTIONARY_FILTER = new PropertyDescriptor.Builder(). + name("Dictionary Filter Pattern"). + description("A Regular Expression that will be applied to each line in the dictionary file. If the regular expression does not match the line, the line will not be included in the list of terms to search for. If a Matching Group is specified, only the portion of the term that matches that Matching Group will be used instead of the entire term. If not specified, all terms in the dictionary will be used and each term will consist of the text of the entire line in the file"). + required(false). + addValidator(StandardValidators.createRegexValidator(0, 1, false)). + defaultValue(null). + build(); private List<PropertyDescriptor> properties; private Set<Relationship> relationships; @@ -100,8 +100,14 @@ public class ScanAttribute extends AbstractProcessor { private volatile Set<String> dictionaryTerms = null; private volatile SynchronousFileWatcher fileWatcher = null; - public static final Relationship REL_MATCHED = new Relationship.Builder().name("matched").description("FlowFiles whose attributes are found in the dictionary will be routed to this relationship").build(); - public static final Relationship REL_UNMATCHED = new Relationship.Builder().name("unmatched").description("FlowFiles whose attributes are not found in the dictionary will be routed to this relationship").build(); + public static final Relationship REL_MATCHED = new Relationship.Builder(). + name("matched"). + description("FlowFiles whose attributes are found in the dictionary will be routed to this relationship"). + build(); + public static final Relationship REL_UNMATCHED = new Relationship.Builder(). + name("unmatched"). + description("FlowFiles whose attributes are not found in the dictionary will be routed to this relationship"). + build(); @Override protected void init(final ProcessorInitializationContext context) { @@ -130,32 +136,41 @@ public class ScanAttribute extends AbstractProcessor { @OnScheduled public void onScheduled(final ProcessContext context) throws IOException { - final String filterRegex = context.getProperty(DICTIONARY_FILTER).getValue(); - this.dictionaryFilterPattern = (filterRegex == null) ? null : Pattern.compile(filterRegex); + final String filterRegex = context.getProperty(DICTIONARY_FILTER). + getValue(); + this.dictionaryFilterPattern = (filterRegex == null) ? null : Pattern. + compile(filterRegex); - final String attributeRegex = context.getProperty(ATTRIBUTE_PATTERN).getValue(); - this.attributePattern = (attributeRegex.equals(".*")) ? null : Pattern.compile(attributeRegex); + final String attributeRegex = context.getProperty(ATTRIBUTE_PATTERN). + getValue(); + this.attributePattern = (attributeRegex.equals(".*")) ? null : Pattern. + compile(attributeRegex); this.dictionaryTerms = createDictionary(context); - this.fileWatcher = new SynchronousFileWatcher(Paths.get(context.getProperty(DICTIONARY_FILE).getValue()), new LastModifiedMonitor(), 1000L); + this.fileWatcher = new SynchronousFileWatcher(Paths.get(context. + getProperty(DICTIONARY_FILE). + getValue()), new LastModifiedMonitor(), 1000L); } private Set<String> createDictionary(final ProcessContext context) throws IOException { final Set<String> terms = new HashSet<>(); - final File file = new File(context.getProperty(DICTIONARY_FILE).getValue()); + final File file = new File(context.getProperty(DICTIONARY_FILE). + getValue()); try (final InputStream fis = new FileInputStream(file); final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) { String line; while ((line = reader.readLine()) != null) { - if (line.trim().isEmpty()) { + if (line.trim(). + isEmpty()) { continue; } String matchingTerm = line; if (dictionaryFilterPattern != null) { - final Matcher matcher = dictionaryFilterPattern.matcher(line); + final Matcher matcher = dictionaryFilterPattern. + matcher(line); if (!matcher.matches()) { continue; } @@ -192,20 +207,27 @@ public class ScanAttribute extends AbstractProcessor { logger.error("Unable to reload dictionary due to {}", e); } - final boolean matchAll = context.getProperty(MATCHING_CRITERIA).getValue().equals(MATCH_CRITERIA_ALL); + final boolean matchAll = context.getProperty(MATCHING_CRITERIA). + getValue(). + equals(MATCH_CRITERIA_ALL); for (final FlowFile flowFile : flowFiles) { final boolean matched = matchAll ? allMatch(flowFile, attributePattern, dictionaryTerms) : anyMatch(flowFile, attributePattern, dictionaryTerms); final Relationship relationship = matched ? REL_MATCHED : REL_UNMATCHED; - session.getProvenanceReporter().route(flowFile, relationship); + session.getProvenanceReporter(). + route(flowFile, relationship); session.transfer(flowFile, relationship); - logger.info("Transferred {} to {}", new Object[]{flowFile, relationship}); + logger. + info("Transferred {} to {}", new Object[]{flowFile, relationship}); } } private boolean allMatch(final FlowFile flowFile, final Pattern attributePattern, final Set<String> dictionary) { - for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) { - if (attributePattern == null || attributePattern.matcher(entry.getKey()).matches()) { + for (final Map.Entry<String, String> entry : flowFile.getAttributes(). + entrySet()) { + if (attributePattern == null || attributePattern.matcher(entry. + getKey()). + matches()) { if (!dictionary.contains(entry.getValue())) { return false; } @@ -216,8 +238,11 @@ public class ScanAttribute extends AbstractProcessor { } private boolean anyMatch(final FlowFile flowFile, final Pattern attributePattern, final Set<String> dictionary) { - for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) { - if (attributePattern == null || attributePattern.matcher(entry.getKey()).matches()) { + for (final Map.Entry<String, String> entry : flowFile.getAttributes(). + entrySet()) { + if (attributePattern == null || attributePattern.matcher(entry. + getKey()). + matches()) { if (dictionary.contains(entry.getValue())) { return true; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanContent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanContent.java index ea2e6c2..28d48ad 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanContent.java @@ -64,30 +64,41 @@ import org.apache.nifi.util.search.ahocorasick.SearchState; @SideEffectFree @SupportsBatching @Tags({"aho-corasick", "scan", "content", "byte sequence", "search", "find", "dictionary"}) -@CapabilityDescription("Scans the content of FlowFiles for terms that are found in a user-supplied dictionary. If a term is matched, the UTF-8 encoded version of the term will be added to the FlowFile using the 'matching.term' attribute") -@WritesAttribute(attribute="matching.term", description="The term that caused the Processor to route the FlowFile to the 'matched' relationship; if FlowFile is routed to the 'unmatched' relationship, this attribute is not added") +@CapabilityDescription("Scans the content of FlowFiles for terms that are found in a user-supplied dictionary. If a term is matched, the UTF-8 " + + "encoded version of the term will be added to the FlowFile using the 'matching.term' attribute") +@WritesAttribute(attribute = "matching.term", description = "The term that caused the Processor to route the FlowFile to the 'matched' relationship; " + + "if FlowFile is routed to the 'unmatched' relationship, this attribute is not added") public class ScanContent extends AbstractProcessor { public static final String TEXT_ENCODING = "text"; public static final String BINARY_ENCODING = "binary"; public static final String MATCH_ATTRIBUTE_KEY = "matching.term"; - public static final PropertyDescriptor DICTIONARY = new PropertyDescriptor.Builder() - .name("Dictionary File") - .description("The filename of the terms dictionary") - .required(true) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .build(); - public static final PropertyDescriptor DICTIONARY_ENCODING = new PropertyDescriptor.Builder() - .name("Dictionary Encoding") - .description("Indicates how the dictionary is encoded. If 'text', dictionary terms are new-line delimited and UTF-8 encoded; if 'binary', dictionary terms are denoted by a 4-byte integer indicating the term length followed by the term itself") - .required(true) - .allowableValues(TEXT_ENCODING, BINARY_ENCODING) - .defaultValue(TEXT_ENCODING) - .build(); - - public static final Relationship REL_MATCH = new Relationship.Builder().name("matched").description("FlowFiles that match at least one term in the dictionary are routed to this relationship").build(); - public static final Relationship REL_NO_MATCH = new Relationship.Builder().name("unmatched").description("FlowFiles that do not match any term in the dictionary are routed to this relationship").build(); + public static final PropertyDescriptor DICTIONARY = new PropertyDescriptor.Builder(). + name("Dictionary File"). + description("The filename of the terms dictionary"). + required(true). + addValidator(StandardValidators.FILE_EXISTS_VALIDATOR). + build(); + public static final PropertyDescriptor DICTIONARY_ENCODING = new PropertyDescriptor.Builder(). + name("Dictionary Encoding"). + description("Indicates how the dictionary is encoded. If 'text', dictionary terms are new-line delimited and UTF-8 encoded; " + + "if 'binary', dictionary terms are denoted by a 4-byte integer indicating the term length followed by the term itself"). + required(true). + allowableValues(TEXT_ENCODING, BINARY_ENCODING). + defaultValue(TEXT_ENCODING). + build(); + + public static final Relationship REL_MATCH = new Relationship.Builder(). + name("matched"). + description("FlowFiles that match at least one " + + "term in the dictionary are routed to this relationship"). + build(); + public static final Relationship REL_NO_MATCH = new Relationship.Builder(). + name("unmatched"). + description("FlowFiles that do not match any " + + "term in the dictionary are routed to this relationship"). + build(); public static final Charset UTF8 = Charset.forName("UTF-8"); @@ -124,7 +135,8 @@ public class ScanContent extends AbstractProcessor { @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { if (descriptor.equals(DICTIONARY)) { - fileWatcherRef.set(new SynchronousFileWatcher(Paths.get(newValue), new LastModifiedMonitor(), 60000L)); + fileWatcherRef. + set(new SynchronousFileWatcher(Paths.get(newValue), new LastModifiedMonitor(), 60000L)); } } @@ -142,10 +154,14 @@ public class ScanContent extends AbstractProcessor { final Search<byte[]> search = new AhoCorasick<>(); final Set<SearchTerm<byte[]>> terms = new HashSet<>(); - final InputStream inStream = Files.newInputStream(Paths.get(context.getProperty(DICTIONARY).getValue()), StandardOpenOption.READ); + final InputStream inStream = Files.newInputStream(Paths. + get(context.getProperty(DICTIONARY). + getValue()), StandardOpenOption.READ); final TermLoader termLoader; - if (context.getProperty(DICTIONARY_ENCODING).getValue().equalsIgnoreCase(TEXT_ENCODING)) { + if (context.getProperty(DICTIONARY_ENCODING). + getValue(). + equalsIgnoreCase(TEXT_ENCODING)) { termLoader = new TextualTermLoader(inStream); } else { termLoader = new BinaryTermLoader(inStream); @@ -159,7 +175,10 @@ public class ScanContent extends AbstractProcessor { search.initializeDictionary(terms); searchRef.set(search); - logger.info("Loaded search dictionary from {}", new Object[]{context.getProperty(DICTIONARY).getValue()}); + logger. + info("Loaded search dictionary from {}", new Object[]{context. + getProperty(DICTIONARY). + getValue()}); return true; } finally { termLoader.close(); @@ -212,9 +231,13 @@ public class ScanContent extends AbstractProcessor { @Override public void process(final InputStream rawIn) throws IOException { try (final InputStream in = new BufferedInputStream(rawIn)) { - final SearchState<byte[]> searchResult = finalSearch.search(in, false); + final SearchState<byte[]> searchResult = finalSearch. + search(in, false); if (searchResult.foundMatch()) { - termRef.set(searchResult.getResults().keySet().iterator().next()); + termRef.set(searchResult.getResults(). + keySet(). + iterator(). + next()); } } } @@ -223,13 +246,17 @@ public class ScanContent extends AbstractProcessor { final SearchTerm<byte[]> matchingTerm = termRef.get(); if (matchingTerm == null) { logger.info("Routing {} to 'unmatched'", new Object[]{flowFile}); - session.getProvenanceReporter().route(flowFile, REL_NO_MATCH); + session.getProvenanceReporter(). + route(flowFile, REL_NO_MATCH); session.transfer(flowFile, REL_NO_MATCH); } else { final String matchingTermString = matchingTerm.toString(UTF8); - logger.info("Routing {} to 'matched' because it matched term {}", new Object[]{flowFile, matchingTermString}); - flowFile = session.putAttribute(flowFile, MATCH_ATTRIBUTE_KEY, matchingTermString); - session.getProvenanceReporter().route(flowFile, REL_MATCH); + logger. + info("Routing {} to 'matched' because it matched term {}", new Object[]{flowFile, matchingTermString}); + flowFile = session. + putAttribute(flowFile, MATCH_ATTRIBUTE_KEY, matchingTermString); + session.getProvenanceReporter(). + route(flowFile, REL_MATCH); session.transfer(flowFile, REL_MATCH); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java index 59aece0..071f6fb 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java @@ -51,14 +51,26 @@ import org.apache.nifi.processor.util.StandardValidators; @CapabilityDescription("Segments a FlowFile into multiple smaller segments on byte boundaries. Each segment is given the following attributes: " + "fragment.identifier, fragment.index, fragment.count, segment.original.filename; these attributes can then be used by the " + "MergeContent processor in order to reconstitute the original FlowFile") -@WritesAttributes({ @WritesAttribute(attribute = "segment.identifier", description = "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute. This attribute is added to maintain backward compatibility, but the fragment.identifier is preferred, as it is designed to work in conjunction with the MergeContent Processor"), - @WritesAttribute(attribute = "segment.index", description = "A one-up number that indicates the ordering of the segments that were created from a single parent FlowFile. This attribute is added to maintain backward compatibility, but the fragment.index is preferred, as it is designed to work in conjunction with the MergeContent Processor"), - @WritesAttribute(attribute = "segment.count", description = "The number of segments generated from the parent FlowFile. This attribute is added to maintain backward compatibility, but the fragment.count is preferred, as it is designed to work in conjunction with the MergeContent Processor"), - @WritesAttribute(attribute = "fragment.identifier", description = "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), - @WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the segments that were created from a single parent FlowFile"), - @WritesAttribute(attribute = "fragment.count", description = "The number of segments generated from the parent FlowFile"), - @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile"), - @WritesAttribute(attribute = "segment.original.filename ", description = "The filename will be updated to include the parent's filename, the segment index, and the segment count") }) +@WritesAttributes({ + @WritesAttribute(attribute = "segment.identifier", + description = "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this " + + "attribute. This attribute is added to maintain backward compatibility, but the fragment.identifier is preferred, as " + + "it is designed to work in conjunction with the MergeContent Processor"), + @WritesAttribute(attribute = "segment.index", + description = "A one-up number that indicates the ordering of the segments that were created from a single parent FlowFile. " + + "This attribute is added to maintain backward compatibility, but the fragment.index is preferred, as it is designed " + + "to work in conjunction with the MergeContent Processor"), + @WritesAttribute(attribute = "segment.count", + description = "The number of segments generated from the parent FlowFile. This attribute is added to maintain backward compatibility, " + + "but the fragment.count is preferred, as it is designed to work in conjunction with the MergeContent Processor"), + @WritesAttribute(attribute = "fragment.identifier", + description = "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), + @WritesAttribute(attribute = "fragment.index", + description = "A one-up number that indicates the ordering of the segments that were created from a single parent FlowFile"), + @WritesAttribute(attribute = "fragment.count", description = "The number of segments generated from the parent FlowFile"), + @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile"), + @WritesAttribute(attribute = "segment.original.filename ", + description = "The filename will be updated to include the parent's filename, the segment index, and the segment count")}) @SeeAlso(MergeContent.class) public class SegmentContent extends AbstractProcessor { @@ -71,15 +83,22 @@ public class SegmentContent extends AbstractProcessor { public static final String FRAGMENT_INDEX = "fragment.index"; public static final String FRAGMENT_COUNT = "fragment.count"; - public static final PropertyDescriptor SIZE = new PropertyDescriptor.Builder() - .name("Segment Size") - .description("The maximum data size for each segment") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .required(true) - .build(); - - public static final Relationship REL_SEGMENTS = new Relationship.Builder().name("segments").description("All segments will be sent to this relationship. If the file was small enough that it was not segmented, a copy of the original is sent to this relationship as well as original").build(); - public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile will be sent to this relationship").build(); + public static final PropertyDescriptor SIZE = new PropertyDescriptor.Builder(). + name("Segment Size"). + description("The maximum data size for each segment"). + addValidator(StandardValidators.DATA_SIZE_VALIDATOR). + required(true). + build(); + + public static final Relationship REL_SEGMENTS = new Relationship.Builder(). + name("segments"). + description("All segments will be sent to this relationship. If the file was small enough that it was not segmented, " + + "a copy of the original is sent to this relationship as well as original"). + build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder(). + name("original"). + description("The original FlowFile will be sent to this relationship"). + build(); private Set<Relationship> relationships; private List<PropertyDescriptor> propertyDescriptors; @@ -113,16 +132,21 @@ public class SegmentContent extends AbstractProcessor { return; } - final String segmentId = UUID.randomUUID().toString(); - final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue(); + final String segmentId = UUID.randomUUID(). + toString(); + final long segmentSize = context.getProperty(SIZE). + asDataSize(DataUnit.B). + longValue(); - final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final String originalFileName = flowFile. + getAttribute(CoreAttributes.FILENAME.key()); if (flowFile.getSize() <= segmentSize) { flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId); flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1"); flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1"); - flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName); + flowFile = session. + putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName); flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId); flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, "1"); @@ -150,7 +174,8 @@ public class SegmentContent extends AbstractProcessor { final Set<FlowFile> segmentSet = new HashSet<>(); for (int i = 1; i <= totalSegments; i++) { final long segmentOffset = segmentSize * (i - 1); - FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset)); + FlowFile segment = session.clone(flowFile, segmentOffset, Math. + min(segmentSize, flowFile.getSize() - segmentOffset)); segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i)); segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i)); segment = session.putAllAttributes(segment, segmentAttributes); @@ -161,9 +186,11 @@ public class SegmentContent extends AbstractProcessor { session.transfer(flowFile, REL_ORIGINAL); if (totalSegments <= 10) { - getLogger().info("Segmented {} into {} segments: {}", new Object[]{flowFile, totalSegments, segmentSet}); + getLogger(). + info("Segmented {} into {} segments: {}", new Object[]{flowFile, totalSegments, segmentSet}); } else { - getLogger().info("Segmented {} into {} segments", new Object[]{flowFile, totalSegments}); + getLogger(). + info("Segmented {} into {} segments", new Object[]{flowFile, totalSegments}); } } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java index 419e12d..1c9a8c5 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java @@ -65,10 +65,11 @@ import org.apache.nifi.util.Tuple; @SupportsBatching @Tags({"content", "split", "binary"}) @CapabilityDescription("Splits incoming FlowFiles by a specified byte sequence") -@WritesAttributes({ @WritesAttribute(attribute = "fragment.identifier", description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), - @WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"), - @WritesAttribute(attribute = "fragment.count", description = "The number of split FlowFiles generated from the parent FlowFile"), - @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile") }) +@WritesAttributes({ + @WritesAttribute(attribute = "fragment.identifier", description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), + @WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"), + @WritesAttribute(attribute = "fragment.count", description = "The number of split FlowFiles generated from the parent FlowFile"), + @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile")}) @SeeAlso(MergeContent.class) public class SplitContent extends AbstractProcessor { @@ -80,46 +81,47 @@ public class SplitContent extends AbstractProcessor { static final AllowableValue HEX_FORMAT = new AllowableValue("Hexadecimal", "Hexadecimal", "The Byte Sequence will be interpreted as a hexadecimal representation of bytes"); static final AllowableValue UTF8_FORMAT = new AllowableValue("Text", "Text", "The Byte Sequence will be interpreted as UTF-8 Encoded text"); - + static final AllowableValue TRAILING_POSITION = new AllowableValue("Trailing", "Trailing", "Keep the Byte Sequence at the end of the first split if <Keep Byte Sequence> is true"); static final AllowableValue LEADING_POSITION = new AllowableValue("Leading", "Leading", "Keep the Byte Sequence at the beginning of the second split if <Keep Byte Sequence> is true"); - - public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder() - .name("Byte Sequence Format") - .description("Specifies how the <Byte Sequence> property should be interpreted") - .required(true) - .allowableValues(HEX_FORMAT, UTF8_FORMAT) - .defaultValue(HEX_FORMAT.getValue()) - .build(); - public static final PropertyDescriptor BYTE_SEQUENCE = new PropertyDescriptor.Builder() - .name("Byte Sequence") - .description("A representation of bytes to look for and upon which to split the source file into separate files") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(true) - .build(); - public static final PropertyDescriptor KEEP_SEQUENCE = new PropertyDescriptor.Builder() - .name("Keep Byte Sequence") - .description("Determines whether or not the Byte Sequence should be included with each Split") - .required(true) - .allowableValues("true", "false") - .defaultValue("false") - .build(); - public static final PropertyDescriptor BYTE_SEQUENCE_LOCATION = new PropertyDescriptor.Builder() - .name("Byte Sequence Location") - .description("If <Keep Byte Sequence> is set to true, specifies whether the byte sequence should be added to the end of the first split or the beginning of the second; if <Keep Byte Sequence> is false, this property is ignored.") - .required(true) - .allowableValues(TRAILING_POSITION, LEADING_POSITION) - .defaultValue(TRAILING_POSITION.getValue()) - .build(); + + public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder(). + name("Byte Sequence Format"). + description("Specifies how the <Byte Sequence> property should be interpreted"). + required(true). + allowableValues(HEX_FORMAT, UTF8_FORMAT). + defaultValue(HEX_FORMAT.getValue()). + build(); + public static final PropertyDescriptor BYTE_SEQUENCE = new PropertyDescriptor.Builder(). + name("Byte Sequence"). + description("A representation of bytes to look for and upon which to split the source file into separate files"). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + required(true). + build(); + public static final PropertyDescriptor KEEP_SEQUENCE = new PropertyDescriptor.Builder(). + name("Keep Byte Sequence"). + description("Determines whether or not the Byte Sequence should be included with each Split"). + required(true). + allowableValues("true", "false"). + defaultValue("false"). + build(); + public static final PropertyDescriptor BYTE_SEQUENCE_LOCATION = new PropertyDescriptor.Builder(). + name("Byte Sequence Location"). + description("If <Keep Byte Sequence> is set to true, specifies whether the byte sequence should be added to the end of the first " + + "split or the beginning of the second; if <Keep Byte Sequence> is false, this property is ignored."). + required(true). + allowableValues(TRAILING_POSITION, LEADING_POSITION). + defaultValue(TRAILING_POSITION.getValue()). + build(); public static final Relationship REL_SPLITS = new Relationship.Builder() - .name("splits") - .description("All Splits will be routed to the splits relationship") - .build(); + .name("splits"). + description("All Splits will be routed to the splits relationship"). + build(); public static final Relationship REL_ORIGINAL = new Relationship.Builder() - .name("original") - .description("The original file") - .build(); + .name("original"). + description("The original file"). + build(); private Set<Relationship> relationships; private List<PropertyDescriptor> properties; @@ -154,22 +156,29 @@ public class SplitContent extends AbstractProcessor { @Override protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { final List<ValidationResult> results = new ArrayList<>(1); - final String format = validationContext.getProperty(FORMAT).getValue(); - if ( HEX_FORMAT.getValue().equals(format) ) { - final String byteSequence = validationContext.getProperty(BYTE_SEQUENCE).getValue(); - final ValidationResult result = new HexStringPropertyValidator().validate(BYTE_SEQUENCE.getName(), byteSequence, validationContext); + final String format = validationContext.getProperty(FORMAT). + getValue(); + if (HEX_FORMAT.getValue(). + equals(format)) { + final String byteSequence = validationContext. + getProperty(BYTE_SEQUENCE). + getValue(); + final ValidationResult result = new HexStringPropertyValidator(). + validate(BYTE_SEQUENCE.getName(), byteSequence, validationContext); results.add(result); } return results; } - @OnScheduled public void initializeByteSequence(final ProcessContext context) throws DecoderException { - final String bytePattern = context.getProperty(BYTE_SEQUENCE).getValue(); - - final String format = context.getProperty(FORMAT).getValue(); - if ( HEX_FORMAT.getValue().equals(format) ) { + final String bytePattern = context.getProperty(BYTE_SEQUENCE). + getValue(); + + final String format = context.getProperty(FORMAT). + getValue(); + if (HEX_FORMAT.getValue(). + equals(format)) { this.byteSequence.set(Hex.decodeHex(bytePattern.toCharArray())); } else { this.byteSequence.set(bytePattern.getBytes(StandardCharsets.UTF_8)); @@ -184,11 +193,14 @@ public class SplitContent extends AbstractProcessor { } final ProcessorLog logger = getLogger(); - final boolean keepSequence = context.getProperty(KEEP_SEQUENCE).asBoolean(); + final boolean keepSequence = context.getProperty(KEEP_SEQUENCE). + asBoolean(); final boolean keepTrailingSequence; final boolean keepLeadingSequence; - if ( keepSequence ) { - if ( context.getProperty(BYTE_SEQUENCE_LOCATION).getValue().equals(TRAILING_POSITION.getValue()) ) { + if (keepSequence) { + if (context.getProperty(BYTE_SEQUENCE_LOCATION). + getValue(). + equals(TRAILING_POSITION.getValue())) { keepTrailingSequence = true; keepLeadingSequence = false; } else { @@ -199,10 +211,11 @@ public class SplitContent extends AbstractProcessor { keepTrailingSequence = false; keepLeadingSequence = false; } - + final byte[] byteSequence = this.byteSequence.get(); if (byteSequence == null) { // should never happen. But just in case... - logger.error("{} Unable to obtain Byte Sequence", new Object[]{this}); + logger. + error("{} Unable to obtain Byte Sequence", new Object[]{this}); session.rollback(); return; } @@ -224,7 +237,8 @@ public class SplitContent extends AbstractProcessor { } bytesRead++; - boolean matched = buffer.addAndCompare((byte) (nextByte & 0xFF)); + boolean matched = buffer. + addAndCompare((byte) (nextByte & 0xFF)); if (matched) { long splitLength; @@ -234,10 +248,10 @@ public class SplitContent extends AbstractProcessor { splitLength = bytesRead - startOffset - byteSequence.length; } - if ( keepLeadingSequence && startOffset > 0 ) { + if (keepLeadingSequence && startOffset > 0) { splitLength += byteSequence.length; } - + final long splitStart = (keepLeadingSequence && startOffset > 0) ? startOffset - byteSequence.length : startOffset; splits.add(new Tuple<>(splitStart, splitLength)); startOffset = bytesRead; @@ -253,7 +267,8 @@ public class SplitContent extends AbstractProcessor { FlowFile clone = session.clone(flowFile); session.transfer(flowFile, REL_ORIGINAL); session.transfer(clone, REL_SPLITS); - logger.info("Found no match for {}; transferring original 'original' and transferring clone {} to 'splits'", new Object[]{flowFile, clone}); + logger. + info("Found no match for {}; transferring original 'original' and transferring clone {} to 'splits'", new Object[]{flowFile, clone}); return; } @@ -277,7 +292,8 @@ public class SplitContent extends AbstractProcessor { finalSplitOffset += byteSequence.length; } if (finalSplitOffset > -1L && finalSplitOffset < flowFile.getSize()) { - FlowFile finalSplit = session.clone(flowFile, finalSplitOffset, flowFile.getSize() - finalSplitOffset); + FlowFile finalSplit = session. + clone(flowFile, finalSplitOffset, flowFile.getSize() - finalSplitOffset); splitList.add(finalSplit); } @@ -286,23 +302,29 @@ public class SplitContent extends AbstractProcessor { session.transfer(flowFile, REL_ORIGINAL); if (splitList.size() > 10) { - logger.info("Split {} into {} files", new Object[]{flowFile, splitList.size()}); + logger. + info("Split {} into {} files", new Object[]{flowFile, splitList. + size()}); } else { - logger.info("Split {} into {} files: {}", new Object[]{flowFile, splitList.size(), splitList}); + logger. + info("Split {} into {} files: {}", new Object[]{flowFile, splitList. + size(), splitList}); } } /** * Apply split index, count and other attributes. * - * @param session - * @param source - * @param unpacked + * @param session session + * @param source source + * @param splits splits */ private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) { - final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key()); + final String originalFilename = source. + getAttribute(CoreAttributes.FILENAME.key()); - final String fragmentId = UUID.randomUUID().toString(); + final String fragmentId = UUID.randomUUID(). + toString(); final ArrayList<FlowFile> newList = new ArrayList<>(splits); splits.clear(); for (int i = 1; i <= newList.size(); i++) { @@ -323,9 +345,16 @@ public class SplitContent extends AbstractProcessor { public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) { try { Hex.decodeHex(input.toCharArray()); - return new ValidationResult.Builder().valid(true).input(input).subject(subject).build(); + return new ValidationResult.Builder().valid(true). + input(input). + subject(subject). + build(); } catch (final Exception e) { - return new ValidationResult.Builder().valid(false).explanation("Not a valid Hex String").input(input).subject(subject).build(); + return new ValidationResult.Builder().valid(false). + explanation("Not a valid Hex String"). + input(input). + subject(subject). + build(); } } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java index bba770a..2ffebd5 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java @@ -41,7 +41,12 @@ import org.apache.nifi.processor.util.StandardValidators; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @EventDriven @@ -54,16 +59,28 @@ import java.util.concurrent.atomic.AtomicReference; + "does not evaluate to an array element, the original file is routed to 'failure' and no files are generated.") public class SplitJson extends AbstractJsonPathProcessor { - public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder() - .name("JsonPath Expression") - .description("A JsonPath expression that indicates the array element to split into JSON/scalar fragments.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // Full validation/caching occurs in #customValidate - .required(true) - .build(); - - public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship").build(); - public static final Relationship REL_SPLIT = new Relationship.Builder().name("split").description("All segments of the original FlowFile will be routed to this relationship").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON or the specified path does not exist), it will be routed to this relationship").build(); + public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder(). + name("JsonPath Expression"). + description("A JsonPath expression that indicates the array element to split into JSON/scalar fragments."). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // Full validation/caching occurs in #customValidate + . + required(true). + build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder(). + name("original"). + description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to " + + "this relationship"). + build(); + public static final Relationship REL_SPLIT = new Relationship.Builder(). + name("split"). + description("All segments of the original FlowFile will be routed to this relationship"). + build(); + public static final Relationship REL_FAILURE = new Relationship.Builder(). + name("failure"). + description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON or the specified " + + "path does not exist), it will be routed to this relationship"). + build(); private List<PropertyDescriptor> properties; private Set<Relationship> relationships; @@ -120,8 +137,10 @@ public class SplitJson extends AbstractJsonPathProcessor { } }; - String value = validationContext.getProperty(ARRAY_JSON_PATH_EXPRESSION).getValue(); - return Collections.singleton(validator.validate(ARRAY_JSON_PATH_EXPRESSION.getName(), value, validationContext)); + String value = validationContext.getProperty(ARRAY_JSON_PATH_EXPRESSION). + getValue(); + return Collections.singleton(validator. + validate(ARRAY_JSON_PATH_EXPRESSION.getName(), value, validationContext)); } @Override @@ -137,14 +156,18 @@ public class SplitJson extends AbstractJsonPathProcessor { try { documentContext = validateAndEstablishJsonContext(processSession, original); } catch (InvalidJsonException e) { - logger.error("FlowFile {} did not have valid JSON content.", new Object[]{original}); + logger. + error("FlowFile {} did not have valid JSON content.", new Object[]{original}); processSession.transfer(original, REL_FAILURE); return; } final JsonPath jsonPath = JSON_PATH_REF.get(); - String representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue(); - final String nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption); + String representationOption = processContext. + getProperty(NULL_VALUE_DEFAULT_REPRESENTATION). + getValue(); + final String nullDefaultValue = NULL_REPRESENTATION_MAP. + get(representationOption); final List<FlowFile> segments = new ArrayList<>(); @@ -152,14 +175,17 @@ public class SplitJson extends AbstractJsonPathProcessor { try { jsonPathResult = documentContext.read(jsonPath); } catch (PathNotFoundException e) { - logger.warn("JsonPath {} could not be found for FlowFile {}", new Object[]{jsonPath.getPath(), original}); + logger. + warn("JsonPath {} could not be found for FlowFile {}", new Object[]{jsonPath. + getPath(), original}); processSession.transfer(original, REL_FAILURE); return; } if (!(jsonPathResult instanceof List)) { - logger.error("The evaluated value {} of {} was not a JSON Array compatible type and cannot be split.", - new Object[]{jsonPathResult, jsonPath.getPath()}); + logger. + error("The evaluated value {} of {} was not a JSON Array compatible type and cannot be split.", + new Object[]{jsonPathResult, jsonPath.getPath()}); processSession.transfer(original, REL_FAILURE); return; } @@ -172,16 +198,20 @@ public class SplitJson extends AbstractJsonPathProcessor { @Override public void process(OutputStream out) throws IOException { String resultSegmentContent = getResultRepresentation(resultSegment, nullDefaultValue); - out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8)); + out.write(resultSegmentContent. + getBytes(StandardCharsets.UTF_8)); } }); segments.add(split); } - processSession.getProvenanceReporter().fork(original, segments); + processSession.getProvenanceReporter(). + fork(original, segments); processSession.transfer(segments, REL_SPLIT); processSession.transfer(original, REL_ORIGINAL); - logger.info("Split {} into {} FlowFiles", new Object[]{original, segments.size()}); + logger. + info("Split {} into {} FlowFiles", new Object[]{original, segments. + size()}); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java index d8f7400..f68ef4e 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java @@ -62,11 +62,11 @@ import java.util.UUID; @Tags({"split", "text"}) @CapabilityDescription("Splits a text file into multiple smaller text files on line boundaries, each having up to a configured number of lines") @WritesAttributes({ - @WritesAttribute(attribute = "text.line.count", description = "The number of lines of text from the original FlowFile that were copied to this FlowFile"), - @WritesAttribute(attribute = "fragment.identifier", description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), - @WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"), - @WritesAttribute(attribute = "fragment.count", description = "The number of split FlowFiles generated from the parent FlowFile"), - @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile") }) + @WritesAttribute(attribute = "text.line.count", description = "The number of lines of text from the original FlowFile that were copied to this FlowFile"), + @WritesAttribute(attribute = "fragment.identifier", description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), + @WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"), + @WritesAttribute(attribute = "fragment.count", description = "The number of split FlowFiles generated from the parent FlowFile"), + @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile")}) @SeeAlso(MergeContent.class) public class SplitText extends AbstractProcessor { @@ -77,32 +77,41 @@ public class SplitText extends AbstractProcessor { public static final String FRAGMENT_COUNT = "fragment.count"; public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename"; - public static final PropertyDescriptor LINE_SPLIT_COUNT = new PropertyDescriptor.Builder() - .name("Line Split Count") - .description("The number of lines that will be added to each split file") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .build(); - public static final PropertyDescriptor HEADER_LINE_COUNT = new PropertyDescriptor.Builder() - .name("Header Line Count") - .description("The number of lines that should be considered part of the header; the header lines will be duplicated to all split files") - .required(true) - .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .defaultValue("0") - .build(); - public static final PropertyDescriptor REMOVE_TRAILING_NEWLINES = new PropertyDescriptor.Builder() - .name("Remove Trailing Newlines") - .description( - "Whether to remove newlines at the end of each split file. This should be false if you intend to merge the split files later") - .required(true) - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .allowableValues("true", "false") - .defaultValue("true") - .build(); - - public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original input file will be routed to this destination when it has been successfully split into 1 or more files").build(); - public static final Relationship REL_SPLITS = new Relationship.Builder().name("splits").description("The split files will be routed to this destination when an input file is successfully split into 1 or more split files").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere").build(); + public static final PropertyDescriptor LINE_SPLIT_COUNT = new PropertyDescriptor.Builder(). + name("Line Split Count"). + description("The number of lines that will be added to each split file"). + required(true). + addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR). + build(); + public static final PropertyDescriptor HEADER_LINE_COUNT = new PropertyDescriptor.Builder(). + name("Header Line Count"). + description("The number of lines that should be considered part of the header; the header lines will be duplicated to all split files"). + required(true). + addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR). + defaultValue("0"). + build(); + public static final PropertyDescriptor REMOVE_TRAILING_NEWLINES = new PropertyDescriptor.Builder(). + name("Remove Trailing Newlines"). + description( + "Whether to remove newlines at the end of each split file. This should be false if you intend to merge the split files later"). + required(true). + addValidator(StandardValidators.BOOLEAN_VALIDATOR). + allowableValues("true", "false"). + defaultValue("true"). + build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder(). + name("original"). + description("The original input file will be routed to this destination when it has been successfully split into 1 or more files"). + build(); + public static final Relationship REL_SPLITS = new Relationship.Builder(). + name("splits"). + description("The split files will be routed to this destination when an input file is successfully split into 1 or more split files"). + build(); + public static final Relationship REL_FAILURE = new Relationship.Builder(). + name("failure"). + description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere"). + build(); private List<PropertyDescriptor> properties; private Set<Relationship> relationships; @@ -226,9 +235,13 @@ public class SplitText extends AbstractProcessor { } final ProcessorLog logger = getLogger(); - final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger(); - final int splitCount = context.getProperty(LINE_SPLIT_COUNT).asInteger(); - final boolean removeTrailingNewlines = context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean(); + final int headerCount = context.getProperty(HEADER_LINE_COUNT). + asInteger(); + final int splitCount = context.getProperty(LINE_SPLIT_COUNT). + asInteger(); + final boolean removeTrailingNewlines = context. + getProperty(REMOVE_TRAILING_NEWLINES). + asBoolean(); final ObjectHolder<String> errorMessage = new ObjectHolder<>(null); final ArrayList<SplitInfo> splitInfos = new ArrayList<>(); @@ -245,7 +258,8 @@ public class SplitText extends AbstractProcessor { final ByteArrayOutputStream headerStream = new ByteArrayOutputStream(); final int headerLinesCopied = readLines(in, headerCount, headerStream, true); if (headerLinesCopied < headerCount) { - errorMessage.set("Header Line Count is set to " + headerCount + " but file had only " + headerLinesCopied + " lines"); + errorMessage. + set("Header Line Count is set to " + headerCount + " but file had only " + headerLinesCopied + " lines"); return; } @@ -256,17 +270,23 @@ public class SplitText extends AbstractProcessor { final IntegerHolder linesCopied = new IntegerHolder(0); FlowFile splitFile = session.create(flowFile); try { - splitFile = session.write(splitFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final BufferedOutputStream out = new BufferedOutputStream(rawOut)) { - headerStream.writeTo(out); - linesCopied.set(readLines(in, splitCount, out, !removeTrailingNewlines)); - } - } - }); - splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get())); - logger.debug("Created Split File {} with {} lines", new Object[]{splitFile, linesCopied.get()}); + splitFile = session. + write(splitFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final BufferedOutputStream out = new BufferedOutputStream(rawOut)) { + headerStream.writeTo(out); + linesCopied. + set(readLines(in, splitCount, out, !removeTrailingNewlines)); + } + } + }); + splitFile = session. + putAttribute(splitFile, SPLIT_LINE_COUNT, String. + valueOf(linesCopied.get())); + logger. + debug("Created Split File {} with {} lines", new Object[]{splitFile, linesCopied. + get()}); } finally { if (linesCopied.get() > 0) { splits.add(splitFile); @@ -293,8 +313,11 @@ public class SplitText extends AbstractProcessor { info.offsetBytes = beforeReadingLines; splitInfos.add(info); final long procNanos = System.nanoTime() - startNanos; - final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS); - logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; total splits = {}; total processing time = {} ms", new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis}); + final long procMillis = TimeUnit.MILLISECONDS. + convert(procNanos, TimeUnit.NANOSECONDS); + logger. + debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; total splits = {}; total processing time = {} ms", new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos. + size(), procMillis}); } } } @@ -303,7 +326,9 @@ public class SplitText extends AbstractProcessor { }); if (errorMessage.get() != null) { - logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()}); + logger. + error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage. + get()}); session.transfer(flowFile, REL_FAILURE); if (splits != null && !splits.isEmpty()) { session.remove(splits); @@ -314,17 +339,22 @@ public class SplitText extends AbstractProcessor { if (!splitInfos.isEmpty()) { // Create the splits for (final SplitInfo info : splitInfos) { - FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes); - split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines)); + FlowFile split = session. + clone(flowFile, info.offsetBytes, info.lengthBytes); + split = session.putAttribute(split, SPLIT_LINE_COUNT, String. + valueOf(info.lengthLines)); splits.add(split); } } finishFragmentAttributes(session, flowFile, splits); if (splits.size() > 10) { - logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()}); + logger.info("Split {} into {} files", new Object[]{flowFile, splits. + size()}); } else { - logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits}); + logger. + info("Split {} into {} files: {}", new Object[]{flowFile, splits. + size(), splits}); } session.transfer(flowFile, REL_ORIGINAL); @@ -339,9 +369,11 @@ public class SplitText extends AbstractProcessor { * @param unpacked */ private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) { - final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key()); + final String originalFilename = source. + getAttribute(CoreAttributes.FILENAME.key()); - final String fragmentId = UUID.randomUUID().toString(); + final String fragmentId = UUID.randomUUID(). + toString(); final ArrayList<FlowFile> newList = new ArrayList<>(splits); splits.clear(); for (int i = 1; i <= newList.size(); i++) {
