[
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15962805#comment-15962805
]
ASF GitHub Bot commented on NIFI-1280:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1652#discussion_r110646586
--- Diff:
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.grok;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import io.thekraken.grok.api.Grok;
+import io.thekraken.grok.api.GrokUtils;
+import io.thekraken.grok.api.Match;
+
+public class GrokRecordReader implements RecordReader {
+ private final BufferedReader reader;
+ private final Grok grok;
+ private RecordSchema schema;
+
+ private String nextLine;
+
+ static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE";
+ private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
+ "^\\s*(?:(?: |\\t)+at )|"
+ + "(?:(?: |\\t)+\\[CIRCULAR REFERENCE\\:)|"
+ + "(?:Caused by\\: )|"
+ + "(?:Suppressed\\: )|"
+ + "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
+
+ private static final FastDateFormat TIME_FORMAT_DATE;
+ private static final FastDateFormat TIME_FORMAT_TIME;
+ private static final FastDateFormat TIME_FORMAT_TIMESTAMP;
+
+ static {
+ final TimeZone gmt = TimeZone.getTimeZone("GMT");
+ TIME_FORMAT_DATE = FastDateFormat.getInstance("yyyy-MM-dd", gmt);
+ TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt);
+ TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("yyyy-MM-dd
HH:mm:ss", gmt);
+ }
+
+ public GrokRecordReader(final InputStream in, final Grok grok, final
RecordSchema schema) {
+ this.reader = new BufferedReader(new InputStreamReader(in));
+ this.grok = grok;
+ this.schema = schema;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public Record nextRecord() throws IOException,
MalformedRecordException {
+ final String line = nextLine == null ? reader.readLine() :
nextLine;
+ nextLine = null; // ensure that we don't process nextLine again
+ if (line == null) {
+ return null;
+ }
+
+ final RecordSchema schema = getSchema();
+
+ final Match match = grok.match(line);
+ match.captures();
+ final Map<String, Object> valueMap = match.toMap();
+ if (valueMap.isEmpty()) { // We were unable to match the pattern
so return an empty Object array.
+ return new MapRecord(schema, Collections.emptyMap());
+ }
+
+ // Read the next line to see if it matches the pattern (in which
case we will simply leave it for
+ // the next call to nextRecord()) or we will attach it to the
previously read record.
+ String stackTrace = null;
+ final StringBuilder toAppend = new StringBuilder();
+ while ((nextLine = reader.readLine()) != null) {
+ final Match nextLineMatch = grok.match(nextLine);
+ nextLineMatch.captures();
+ final Map<String, Object> nextValueMap = nextLineMatch.toMap();
+ if (nextValueMap.isEmpty()) {
+ // next line did not match. Check if it indicates a Stack
Trace. If so, read until
+ // the stack trace ends. Otherwise, append the next line
to the last field in the record.
+ if (isStartOfStackTrace(nextLine)) {
+ stackTrace = readStackTrace(nextLine);
+ break;
+ } else {
+ toAppend.append("\n").append(nextLine);
+ }
+ } else {
+ // The next line matched our pattern.
+ break;
+ }
+ }
+
+ try {
+ final List<DataType> fieldTypes = schema.getDataTypes();
+ final Map<String, Object> values = new
HashMap<>(fieldTypes.size());
+
+ for (final String fieldName : schema.getFieldNames()) {
+ final Object value = valueMap.get(fieldName);
+ if (value == null) {
+ values.put(fieldName, null);
+ continue;
+ }
+
+ final DataType fieldType =
schema.getDataType(fieldName).orElse(null);
+ final Object converted = convert(fieldType,
value.toString());
+ values.put(fieldName, converted);
+ }
+
+ final String lastFieldBeforeStackTrace =
schema.getFieldNames().get(schema.getFieldCount() - 2);
+ if (toAppend.length() > 0) {
+ final Object existingValue =
values.get(lastFieldBeforeStackTrace);
+ final String updatedValue = existingValue == null ?
toAppend.toString() : existingValue + toAppend.toString();
+ values.put(lastFieldBeforeStackTrace, updatedValue);
+ }
+
+ values.put(STACK_TRACE_COLUMN_NAME, stackTrace);
+
+ return new MapRecord(schema, values);
+ } catch (final Exception e) {
+ throw new MalformedRecordException("Found invalid log record
and will skip it. Record: " + line, e);
+ }
+ }
+
+
+ private boolean isStartOfStackTrace(final String line) {
+ if (line == null) {
+ return false;
+ }
+
+ // Stack Traces are generally of the form:
+ // java.lang.IllegalArgumentException: My message
+ // at org.apache.nifi.MyClass.myMethod(MyClass.java:48)
+ // at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
+ // Caused by: java.net.SocketTimeoutException: null
+ // ... 13 common frames omitted
--- End diff --
I did. Just because when debugging I had to generate stack traces a few
times. So I just added a comment that included this so that it'll be easier for
debugging later
> Create QueryFlowFile Processor
> ------------------------------
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
> Issue Type: Task
> Components: Extensions
> Reporter: Mark Payne
> Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific
> columns from CSV data. For instance, a user would configure two different
> properties: "Columns of Interest" (a comma-separated list of column indexes)
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it
> would be with this Processor, as the user has to use Regular Expressions,
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a
> Sample CSV and choose which columns from there, similar to the way that Excel
> works when importing CSV by dragging and selecting the desired columns? That
> would certainly be a larger undertaking and would not need to be done for an
> initial implementation.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)