[
https://issues.apache.org/jira/browse/NIFI-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16343711#comment-16343711
]
ASF GitHub Bot commented on NIFI-4789:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2411#discussion_r164499345
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java
---
@@ -209,78 +287,94 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
try {
final byte[] byteBuffer = buffer;
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(InputStream in) throws IOException {
- StreamUtils.fillBuffer(in, byteBuffer, false);
- }
- });
+ session.read(flowFile, in -> StreamUtils.fillBuffer(in,
byteBuffer, false));
final long len = Math.min(byteBuffer.length,
flowFile.getSize());
contentString = new String(byteBuffer, 0, (int) len, charset);
} finally {
bufferQueue.offer(buffer);
}
- final Match gm = grok.match(contentString);
- gm.captures();
-
- if (gm.toMap().isEmpty()) {
- session.transfer(flowFile, REL_NO_MATCH);
- getLogger().info("Did not match any Grok Expressions for
FlowFile {}", new Object[]{flowFile});
- return;
- }
-
- final ObjectMapper objectMapper = new ObjectMapper();
- switch (context.getProperty(DESTINATION).getValue()) {
- case FLOWFILE_ATTRIBUTE:
- Map<String, String> grokResults = new HashMap<>();
- for (Map.Entry<String, Object> entry :
gm.toMap().entrySet()) {
- if (null != entry.getValue()) {
- grokResults.put("grok." + entry.getKey(),
entry.getValue().toString());
+ try{
+ for (Grok grok : grokList) {
+ final Match gm = grok.match(contentString);
+ gm.captures();
+ final Map<String, Object> localResults = gm.toMap();
+ if (!localResults.isEmpty()) {
+
matchedExpressionList.add(grok.getOriginalGrokPattern());
+ results.putAll(localResults);
+ if (breakOnFirstMatch) {
+ break;
}
}
+ }
- flowFile = session.putAllAttributes(flowFile, grokResults);
- session.getProvenanceReporter().modifyAttributes(flowFile);
- session.transfer(flowFile, REL_MATCH);
- getLogger().info("Matched {} Grok Expressions and added
attributes to FlowFile {}", new Object[]{grokResults.size(), flowFile});
-
- break;
- case FLOWFILE_CONTENT:
- FlowFile conFlowfile = session.write(flowFile, new
StreamCallback() {
- @Override
- public void process(InputStream in, OutputStream out)
throws IOException {
-
out.write(objectMapper.writeValueAsBytes(gm.toMap()));
- }
- });
- conFlowfile = session.putAttribute(conFlowfile,
CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
- session.getProvenanceReporter().modifyContent(conFlowfile,
"Replaced content with parsed Grok fields and values",
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(conFlowfile, REL_MATCH);
+ if (results.isEmpty()) {
+ session.transfer(flowFile, REL_NO_MATCH);
+ getLogger().debug("Did not match any Grok Expression for
FlowFile {}", new Object[]{flowFile});
+ return;
+ }
+
+ String matchedExpressions =
StringUtils.join(matchedExpressionList, expressionSeparator);
+ flowFile = session.putAttribute(flowFile,
matchedExpressionAttribute, matchedExpressions);
- break;
+ switch (context.getProperty(DESTINATION).getValue()) {
+ case FLOWFILE_ATTRIBUTE:
+ Map<String, String> grokResults = new HashMap<>();
+ for (Map.Entry<String, Object> entry :
results.entrySet()) {
+ if (null != entry.getValue()) {
+ grokResults.put(resultPrefix + entry.getKey(),
entry.getValue().toString());
+ }
+ }
+ flowFile = session.putAllAttributes(flowFile,
grokResults);
+
session.getProvenanceReporter().modifyAttributes(flowFile);
+ session.transfer(flowFile, REL_MATCH);
+ getLogger().info("Matched {} Grok Expressions and
added attributes to FlowFile {}", new Object[]{grokResults.size(), flowFile});
+ break;
+ case FLOWFILE_CONTENT:
+ final ObjectMapper objectMapper = new ObjectMapper();
+ FlowFile conFlowfile = session.write(flowFile, (in,
out) -> {
+ out.write(objectMapper.writeValueAsBytes(results));
+ });
+
+ conFlowfile = session.putAttribute(conFlowfile,
CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
+
session.getProvenanceReporter().modifyContent(conFlowfile, "Replaced content
with parsed Grok fields and values",
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(conFlowfile, REL_MATCH);
+ break;
+ }
+ }catch(ProcessException t){
+ flowFile = session.putAttribute(flowFile,
getClass().getSimpleName() + ".exception", t.getMessage());
+ session.transfer(flowFile, REL_NO_MATCH);
+ getLogger().error("Did not match any Grok Expression for
FlowFile {}", new Object[]{flowFile});
+ return;
}
}
+ public static Validator validateMultipleFilesExist() {
+ return (subject, input, context) -> {
+ for (String s : input.split(PATTERN_FILE_LIST_SEPARATOR)) {
--- End diff --
The logic here doesn't appear to be right. This is going to create a
validator for the first file that it encounters but not validate the others.
> Enhance ExtractGrok processor to handle multiple grok expressions
> -----------------------------------------------------------------
>
> Key: NIFI-4789
> URL: https://issues.apache.org/jira/browse/NIFI-4789
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Core Framework
> Affects Versions: 1.2.0, 1.5.0
> Environment: all
> Reporter: Charles Porter
> Priority: Minor
> Labels: features
>
> Many flows require running several grok expressions against an input to
> correctly tag and extract data. using many separate grok processors to
> accomplish this is unwieldy and hard to maintain. Supporting multiple grok
> expressions delimited by comma or user selected delimiter greatly simplifies
> this.
> Feature is coded and tested, ready for pull request, if feature is approved
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)