Github user mmiklavc commented on a diff in the pull request: https://github.com/apache/metron/pull/1184#discussion_r223810558 --- Diff: metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java --- @@ -134,26 +144,102 @@ public void init() { @SuppressWarnings("unchecked") @Override public List<JSONObject> parse(byte[] rawMessage) { + Optional<MessageParserResult<JSONObject>> resultOptional = parseOptionalResult(rawMessage); + if (!resultOptional.isPresent()) { + return Collections.EMPTY_LIST; + } + Map<Object,Throwable> errors = resultOptional.get().getMessageThrowables(); + if (!errors.isEmpty()) { + throw new RuntimeException(errors.entrySet().iterator().next().getValue()); + } + + return resultOptional.get().getMessages(); + } + + @SuppressWarnings("unchecked") + @Override + public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] rawMessage) { if (grok == null) { init(); } + if (multiLine) { + return parseMultiLine(rawMessage); + } + return parseSingleLine(rawMessage); + } + + @SuppressWarnings("unchecked") + private Optional<MessageParserResult<JSONObject>> parseMultiLine(byte[] rawMessage) { List<JSONObject> messages = new ArrayList<>(); + Map<Object,Throwable> errors = new HashMap<>(); String originalMessage = null; // read the incoming raw data as if it may have multiple lines of logs // if there is only only one line, it will just get processed. try (BufferedReader reader = new BufferedReader(new StringReader(new String(rawMessage, StandardCharsets.UTF_8)))) { while ((originalMessage = reader.readLine()) != null) { LOG.debug("Grok parser parsing message: {}", originalMessage); - Match gm = grok.match(originalMessage); - gm.captures(); - JSONObject message = new JSONObject(); - message.putAll(gm.toMap()); + try { + Match gm = grok.match(originalMessage); + gm.captures(); + JSONObject message = new JSONObject(); + message.putAll(gm.toMap()); - if (message.size() == 0) - throw new RuntimeException("Grok statement produced a null message. Original message was: " - + originalMessage + " and the parsed message was: " + message + " . Check the pattern at: " - + grokPath); + if (message.size() == 0) { + Throwable rte = new RuntimeException("Grok statement produced a null message. Original message was: " + + originalMessage + " and the parsed message was: " + message + " . Check the pattern at: " + + grokPath); + errors.put(originalMessage, rte); + continue; + } + message.put("original_string", originalMessage); + for (String timeField : timeFields) { + String fieldValue = (String) message.get(timeField); + if (fieldValue != null) { + message.put(timeField, toEpoch(fieldValue)); + } + } + if (timestampField != null) { + message.put(Constants.Fields.TIMESTAMP.getName(), formatTimestamp(message.get(timestampField))); + } + message.remove(patternLabel); + postParse(message); + messages.add(message); + LOG.debug("Grok parser parsed message: {}", message); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + errors.put(originalMessage, e); + } + } + } catch (IOException e) { --- End diff -- This is specifically for exceptions thrown during reader use? The inner try/catch w/Exception appears to handle everything else parser related.
---