Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1652#discussion_r110646586
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
 ---
    @@ -0,0 +1,318 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.grok;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TimeZone;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import org.apache.commons.lang3.time.FastDateFormat;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.DataType;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import io.thekraken.grok.api.Grok;
    +import io.thekraken.grok.api.GrokUtils;
    +import io.thekraken.grok.api.Match;
    +
    +public class GrokRecordReader implements RecordReader {
    +    private final BufferedReader reader;
    +    private final Grok grok;
    +    private RecordSchema schema;
    +
    +    private String nextLine;
    +
    +    static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE";
    +    private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
    +        "^\\s*(?:(?:    |\\t)+at )|"
    +            + "(?:(?:    |\\t)+\\[CIRCULAR REFERENCE\\:)|"
    +            + "(?:Caused by\\: )|"
    +            + "(?:Suppressed\\: )|"
    +            + "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
    +
    +    private static final FastDateFormat TIME_FORMAT_DATE;
    +    private static final FastDateFormat TIME_FORMAT_TIME;
    +    private static final FastDateFormat TIME_FORMAT_TIMESTAMP;
    +
    +    static {
    +        final TimeZone gmt = TimeZone.getTimeZone("GMT");
    +        TIME_FORMAT_DATE = FastDateFormat.getInstance("yyyy-MM-dd", gmt);
    +        TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt);
    +        TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("yyyy-MM-dd 
HH:mm:ss", gmt);
    +    }
    +
    +    public GrokRecordReader(final InputStream in, final Grok grok, final 
RecordSchema schema) {
    +        this.reader = new BufferedReader(new InputStreamReader(in));
    +        this.grok = grok;
    +        this.schema = schema;
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +        reader.close();
    +    }
    +
    +    @Override
    +    public Record nextRecord() throws IOException, 
MalformedRecordException {
    +        final String line = nextLine == null ? reader.readLine() : 
nextLine;
    +        nextLine = null; // ensure that we don't process nextLine again
    +        if (line == null) {
    +            return null;
    +        }
    +
    +        final RecordSchema schema = getSchema();
    +
    +        final Match match = grok.match(line);
    +        match.captures();
    +        final Map<String, Object> valueMap = match.toMap();
    +        if (valueMap.isEmpty()) {   // We were unable to match the pattern 
so return an empty Object array.
    +            return new MapRecord(schema, Collections.emptyMap());
    +        }
    +
    +        // Read the next line to see if it matches the pattern (in which 
case we will simply leave it for
    +        // the next call to nextRecord()) or we will attach it to the 
previously read record.
    +        String stackTrace = null;
    +        final StringBuilder toAppend = new StringBuilder();
    +        while ((nextLine = reader.readLine()) != null) {
    +            final Match nextLineMatch = grok.match(nextLine);
    +            nextLineMatch.captures();
    +            final Map<String, Object> nextValueMap = nextLineMatch.toMap();
    +            if (nextValueMap.isEmpty()) {
    +                // next line did not match. Check if it indicates a Stack 
Trace. If so, read until
    +                // the stack trace ends. Otherwise, append the next line 
to the last field in the record.
    +                if (isStartOfStackTrace(nextLine)) {
    +                    stackTrace = readStackTrace(nextLine);
    +                    break;
    +                } else {
    +                    toAppend.append("\n").append(nextLine);
    +                }
    +            } else {
    +                // The next line matched our pattern.
    +                break;
    +            }
    +        }
    +
    +        try {
    +            final List<DataType> fieldTypes = schema.getDataTypes();
    +            final Map<String, Object> values = new 
HashMap<>(fieldTypes.size());
    +
    +            for (final String fieldName : schema.getFieldNames()) {
    +                final Object value = valueMap.get(fieldName);
    +                if (value == null) {
    +                    values.put(fieldName, null);
    +                    continue;
    +                }
    +
    +                final DataType fieldType = 
schema.getDataType(fieldName).orElse(null);
    +                final Object converted = convert(fieldType, 
value.toString());
    +                values.put(fieldName, converted);
    +            }
    +
    +            final String lastFieldBeforeStackTrace = 
schema.getFieldNames().get(schema.getFieldCount() - 2);
    +            if (toAppend.length() > 0) {
    +                final Object existingValue = 
values.get(lastFieldBeforeStackTrace);
    +                final String updatedValue = existingValue == null ? 
toAppend.toString() : existingValue + toAppend.toString();
    +                values.put(lastFieldBeforeStackTrace, updatedValue);
    +            }
    +
    +            values.put(STACK_TRACE_COLUMN_NAME, stackTrace);
    +
    +            return new MapRecord(schema, values);
    +        } catch (final Exception e) {
    +            throw new MalformedRecordException("Found invalid log record 
and will skip it. Record: " + line, e);
    +        }
    +    }
    +
    +
    +    private boolean isStartOfStackTrace(final String line) {
    +        if (line == null) {
    +            return false;
    +        }
    +
    +        // Stack Traces are generally of the form:
    +        // java.lang.IllegalArgumentException: My message
    +        //   at org.apache.nifi.MyClass.myMethod(MyClass.java:48)
    +        //   at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
    +        // Caused by: java.net.SocketTimeoutException: null
    +        //   ... 13 common frames omitted
    --- End diff --
    
    I did. Just because when debugging I had to generate stack traces a few 
times. So I just added a comment that included this so that it'll be easier for 
debugging later


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to