Github user ottobackwards commented on a diff in the pull request:
https://github.com/apache/metron/pull/1184#discussion_r223833982
--- Diff:
metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
---
@@ -134,26 +144,102 @@ public void init() {
@SuppressWarnings("unchecked")
@Override
public List<JSONObject> parse(byte[] rawMessage) {
+ Optional<MessageParserResult<JSONObject>> resultOptional =
parseOptionalResult(rawMessage);
+ if (!resultOptional.isPresent()) {
+ return Collections.EMPTY_LIST;
+ }
+ Map<Object,Throwable> errors =
resultOptional.get().getMessageThrowables();
+ if (!errors.isEmpty()) {
+ throw new
RuntimeException(errors.entrySet().iterator().next().getValue());
+ }
+
+ return resultOptional.get().getMessages();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Optional<MessageParserResult<JSONObject>>
parseOptionalResult(byte[] rawMessage) {
if (grok == null) {
init();
}
+ if (multiLine) {
+ return parseMultiLine(rawMessage);
+ }
+ return parseSingleLine(rawMessage);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Optional<MessageParserResult<JSONObject>> parseMultiLine(byte[]
rawMessage) {
List<JSONObject> messages = new ArrayList<>();
+ Map<Object,Throwable> errors = new HashMap<>();
String originalMessage = null;
// read the incoming raw data as if it may have multiple lines of logs
// if there is only only one line, it will just get processed.
try (BufferedReader reader = new BufferedReader(new StringReader(new
String(rawMessage, StandardCharsets.UTF_8)))) {
while ((originalMessage = reader.readLine()) != null) {
LOG.debug("Grok parser parsing message: {}", originalMessage);
- Match gm = grok.match(originalMessage);
- gm.captures();
- JSONObject message = new JSONObject();
- message.putAll(gm.toMap());
+ try {
+ Match gm = grok.match(originalMessage);
+ gm.captures();
+ JSONObject message = new JSONObject();
+ message.putAll(gm.toMap());
- if (message.size() == 0)
- throw new RuntimeException("Grok statement produced a null
message. Original message was: "
- + originalMessage + " and the parsed message was: " +
message + " . Check the pattern at: "
- + grokPath);
+ if (message.size() == 0) {
+ Throwable rte = new RuntimeException("Grok statement produced
a null message. Original message was: "
+ + originalMessage + " and the parsed message was: " +
message + " . Check the pattern at: "
+ + grokPath);
+ errors.put(originalMessage, rte);
+ continue;
+ }
+ message.put("original_string", originalMessage);
+ for (String timeField : timeFields) {
+ String fieldValue = (String) message.get(timeField);
+ if (fieldValue != null) {
+ message.put(timeField, toEpoch(fieldValue));
+ }
+ }
+ if (timestampField != null) {
+ message.put(Constants.Fields.TIMESTAMP.getName(),
formatTimestamp(message.get(timestampField)));
+ }
+ message.remove(patternLabel);
+ postParse(message);
+ messages.add(message);
+ LOG.debug("Grok parser parsed message: {}", message);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ errors.put(originalMessage, e);
+ }
+ }
+ } catch (IOException e) {
--- End diff --
That is right
---