[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15962173#comment-15962173 ]
ASF GitHub Bot commented on NIFI-1280: -------------------------------------- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110541830 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.grok; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import io.thekraken.grok.api.Grok; +import io.thekraken.grok.api.GrokUtils; +import io.thekraken.grok.api.Match; + +public class GrokRecordReader implements RecordReader { + private final BufferedReader reader; + private final Grok grok; + private RecordSchema schema; + + private String nextLine; + + static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE"; + private static final Pattern STACK_TRACE_PATTERN = Pattern.compile( + "^\\s*(?:(?: |\\t)+at )|" + + "(?:(?: |\\t)+\\[CIRCULAR REFERENCE\\:)|" + + "(?:Caused by\\: )|" + + "(?:Suppressed\\: )|" + + "(?:\\s+... \\d+ (?:more|common frames? omitted)$)"); + + private static final FastDateFormat TIME_FORMAT_DATE; + private static final FastDateFormat TIME_FORMAT_TIME; + private static final FastDateFormat TIME_FORMAT_TIMESTAMP; + + static { + final TimeZone gmt = TimeZone.getTimeZone("GMT"); + TIME_FORMAT_DATE = FastDateFormat.getInstance("yyyy-MM-dd", gmt); + TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt); + TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt); + } + + public GrokRecordReader(final InputStream in, final Grok grok, final RecordSchema schema) { + this.reader = new BufferedReader(new InputStreamReader(in)); + this.grok = grok; + this.schema = schema; + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public Record nextRecord() throws IOException, MalformedRecordException { + final String line = nextLine == null ? reader.readLine() : nextLine; + nextLine = null; // ensure that we don't process nextLine again + if (line == null) { + return null; + } + + final RecordSchema schema = getSchema(); + + final Match match = grok.match(line); + match.captures(); + final Map<String, Object> valueMap = match.toMap(); + if (valueMap.isEmpty()) { // We were unable to match the pattern so return an empty Object array. + return new MapRecord(schema, Collections.emptyMap()); + } + + // Read the next line to see if it matches the pattern (in which case we will simply leave it for + // the next call to nextRecord()) or we will attach it to the previously read record. + String stackTrace = null; + final StringBuilder toAppend = new StringBuilder(); + while ((nextLine = reader.readLine()) != null) { + final Match nextLineMatch = grok.match(nextLine); + nextLineMatch.captures(); + final Map<String, Object> nextValueMap = nextLineMatch.toMap(); + if (nextValueMap.isEmpty()) { + // next line did not match. Check if it indicates a Stack Trace. If so, read until + // the stack trace ends. Otherwise, append the next line to the last field in the record. + if (isStartOfStackTrace(nextLine)) { + stackTrace = readStackTrace(nextLine); + break; + } else { + toAppend.append("\n").append(nextLine); + } + } else { + // The next line matched our pattern. + break; + } + } + + try { + final List<DataType> fieldTypes = schema.getDataTypes(); + final Map<String, Object> values = new HashMap<>(fieldTypes.size()); + + for (final String fieldName : schema.getFieldNames()) { + final Object value = valueMap.get(fieldName); + if (value == null) { + values.put(fieldName, null); + continue; + } + + final DataType fieldType = schema.getDataType(fieldName).orElse(null); + final Object converted = convert(fieldType, value.toString()); + values.put(fieldName, converted); + } + + final String lastFieldBeforeStackTrace = schema.getFieldNames().get(schema.getFieldCount() - 2); + if (toAppend.length() > 0) { + final Object existingValue = values.get(lastFieldBeforeStackTrace); + final String updatedValue = existingValue == null ? toAppend.toString() : existingValue + toAppend.toString(); + values.put(lastFieldBeforeStackTrace, updatedValue); + } + + values.put(STACK_TRACE_COLUMN_NAME, stackTrace); + + return new MapRecord(schema, values); + } catch (final Exception e) { + throw new MalformedRecordException("Found invalid log record and will skip it. Record: " + line, e); + } + } + + + private boolean isStartOfStackTrace(final String line) { + if (line == null) { + return false; + } + + // Stack Traces are generally of the form: + // java.lang.IllegalArgumentException: My message + // at org.apache.nifi.MyClass.myMethod(MyClass.java:48) + // at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60] + // Caused by: java.net.SocketTimeoutException: null + // ... 13 common frames omitted + + int index = line.indexOf("Exception: "); + if (index < 0) { + index = line.indexOf("Error: "); + } + + if (index < 0) { + return false; + } + + if (line.indexOf(" ") < index) { + return false; + } + + return true; + } + + private String readStackTrace(final String firstLine) throws IOException { + final StringBuilder sb = new StringBuilder(firstLine); + + String line; + while ((line = reader.readLine()) != null) { + if (isLineInStackTrace(line)) { + sb.append("\n").append(line); + } else { + nextLine = line; + break; + } + } + + return sb.toString(); + } + + private boolean isLineInStackTrace(final String line) { + return STACK_TRACE_PATTERN.matcher(line).find(); + } + + + protected Object convert(final DataType fieldType, final String string) { + if (fieldType == null) { --- End diff -- Is it possible for 'string' be null? Basically I am wondering about all those `string.length()` checks below and if null checks should be added. > Create QueryFlowFile Processor > ------------------------------ > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions > Reporter: Mark Payne > Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)