[ https://issues.apache.org/jira/browse/NIFI-2565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572096#comment-15572096 ]
ASF GitHub Bot commented on NIFI-2565: -------------------------------------- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1108#discussion_r83228785 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GrokParser.java --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import com.fasterxml.jackson.databind.ObjectMapper; +import oi.thekraken.grok.api.Grok; +import oi.thekraken.grok.api.Match; +import oi.thekraken.grok.api.exception.GrokException; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.FlowFile; + +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.stream.io.StreamUtils; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.Collections; + + +@Tags({"Grok Processor"}) +@CapabilityDescription("Use Grok expression ,a la logstash, to parse data.") +@SeeAlso({}) +@ReadsAttributes({@ReadsAttribute(attribute="", description="")}) +@WritesAttributes({@WritesAttribute(attribute="", description="")}) +public class GrokParser extends AbstractProcessor { + + + public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute"; + public static final String DESTINATION_CONTENT = "flowfile-content"; + private static final String APPLICATION_JSON = "application/json"; + + public static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor + .Builder().name("Grok Expression") + .description("Grok expression") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor GROK_PATTERN_FILE = new PropertyDescriptor + .Builder().name("Grok Pattern file") + .description("Grok Pattern file definition") + .required(false) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() + .name("Destination") + .description("Control if Grok output value is written as a new flowfile attribute " + + "or written in the flowfile content. Writing to flowfile content will overwrite any " + + "existing flowfile content.") + .required(true) + .allowableValues(DESTINATION_ATTRIBUTE, DESTINATION_CONTENT) + .defaultValue(DESTINATION_ATTRIBUTE) + .build(); + + public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor + .Builder().name("Character Set") + .description("The Character Set in which the file is encoded") + .required(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .build(); + + public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor + .Builder().name("Maximum Buffer Size") + .description("Specifies the maximum amount of data to buffer (per file) in order to apply the Grok expressions. Files larger than the specified maximum will not be fully evaluated.") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .addValidator(StandardValidators.createDataSizeBoundsValidator(0, Integer.MAX_VALUE)) + .defaultValue("1 MB") + .build(); + + public static final Relationship REL_MATCH = new Relationship.Builder() + .name("matched") + .description("FlowFiles are routed to this relationship when the Grok Expression is successfully evaluated and the FlowFile is modified as a result") + .build(); + + public static final Relationship REL_NO_MATCH = new Relationship.Builder() + .name("unmatched") + .description("FlowFiles are routed to this relationship when no provided Grok Expression matches the content of the FlowFile") + .build(); + + private List<PropertyDescriptor> descriptors; + + private Set<Relationship> relationships; + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + private Grok grok; + private byte[] buffer; + + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.add(GROK_EXPRESSION); + descriptors.add(GROK_PATTERN_FILE); + descriptors.add(DESTINATION); + descriptors.add(CHARACTER_SET); + descriptors.add(MAX_BUFFER_SIZE); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<Relationship>(); + relationships.add(REL_MATCH); + relationships.add(REL_NO_MATCH); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + + final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + buffer = new byte[maxBufferSize]; + + try{ + grok = Grok.create(context.getProperty(GROK_PATTERN_FILE).getValue()); + grok.compile(context.getProperty(GROK_EXPRESSION).getValue()); + }catch (GrokException e){ + getLogger().error("Failed to initialize ExtractGrok due to: ", e); + } + + + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); + final Map<String, String> grokResults = new HashMap<>(); + final byte[] byteBuffer = buffer; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + StreamUtils.fillBuffer(in, byteBuffer, false); + } + }); + final long len = Math.min(byteBuffer.length, flowFile.getSize()); + final String contentString = new String(byteBuffer, 0, (int) len, charset); + + + final Match gm = grok.match(contentString); + gm.captures(); + for(Map.Entry<String,Object> entry: gm.toMap().entrySet()){ + if(null != entry.getValue() ) { + grokResults.put(entry.getKey(), entry.getValue().toString()); + } + } + + if (grokResults.isEmpty()) { + session.transfer(flowFile, REL_NO_MATCH); + getLogger().info("Did not match any Grok Expressions for FlowFile {}", new Object[]{flowFile}); + return ; + } + switch (context.getProperty(DESTINATION).getValue()){ + case DESTINATION_ATTRIBUTE: + + + flowFile = session.putAllAttributes(flowFile, grokResults); + + session.getProvenanceReporter().modifyAttributes(flowFile); --- End diff -- It's probably worth reporting this along with the amount of time that it took to update the attributes. This can be very nice information to have if troubleshooting a flow. > NiFi processor to parse logs using Grok patterns > ------------------------------------------------ > > Key: NIFI-2565 > URL: https://issues.apache.org/jira/browse/NIFI-2565 > Project: Apache NiFi > Issue Type: Improvement > Reporter: Andre > Fix For: 1.1.0 > > > Following up on Ryan Ward to create a Grok capable parser > https://mail-archives.apache.org/mod_mbox/nifi-dev/201606.mbox/%3CCADD=rnPa8nHkJbeM280=PTQ=wurtwhstm5u+7btoo9pcym2...@mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)