Repository: apex-malhar Updated Branches: refs/heads/master 2aeeb2ae1 -> 113978fd3
APEXMALHAR-2365-Creation of generic log parser Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/21aa3d6c Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/21aa3d6c Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/21aa3d6c Branch: refs/heads/master Commit: 21aa3d6cf2f4be3fc5297252bc8ef6de15d27f62 Parents: f4fb62c Author: jogshraddha <[email protected]> Authored: Fri Dec 16 15:51:01 2016 +0530 Committer: jogshraddha <[email protected]> Committed: Fri Dec 16 15:51:01 2016 +0530 ---------------------------------------------------------------------- .../datatorrent/contrib/parser/LogParser.java | 235 +++++++++++++++++++ .../contrib/parser/LogSchemaDetails.java | 234 ++++++++++++++++++ .../contrib/parser/LogParserTest.java | 168 +++++++++++++ .../src/test/resources/invalidLogSchema.json | 15 ++ contrib/src/test/resources/logSchema.json | 15 ++ 5 files changed, 667 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/21aa3d6c/contrib/src/main/java/com/datatorrent/contrib/parser/LogParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/LogParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/LogParser.java new file mode 100644 index 0000000..a1f1290 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/LogParser.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.parser; + +import java.io.IOException; +import javax.validation.constraints.NotNull; +import org.codehaus.jettison.json.JSONException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.CharEncoding; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.InterfaceStability; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.parser.Parser; +import com.datatorrent.lib.util.KeyValPair; + +/** + * Operator that parses a log string tuple against the + * a specified json schema and emits POJO on a parsed port and tuples that could not be + * parsed on error port.<br> + * <b>Properties</b><br> + * <b>jsonSchema</b>:schema as a string<br> + * <b>clazz</b>:Pojo class in case of user specified schema<br> + * <b>Ports</b> <br> + * <b>in</b>:input tuple as a String. Each tuple represents a log<br> + * <b>parsedOutput</b>:tuples that are validated against the specified schema are emitted + * as POJO on this port<br> + * <b>err</b>:tuples that do not confine to log format are emitted on this port as + * KeyValPair<String,String><br> + * Key being the tuple and Val being the reason. + */ [email protected] +public class LogParser extends Parser<byte[], KeyValPair<String, String>> +{ + private transient Class<?> clazz; + + @NotNull + private String logFileFormat; + + private String encoding; + + private LogSchemaDetails logSchemaDetails; + + private transient ObjectMapper objMapper; + + @Override + public Object convert(byte[] tuple) + { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public KeyValPair<String, String> processErrorTuple(byte[] bytes) + { + return null; + } + + /** + * output port to emit valid records as POJO + */ + public transient DefaultOutputPort<Object> parsedOutput = new DefaultOutputPort<Object>() + { + public void setup(Context.PortContext context) + { + clazz = context.getValue(Context.PortContext.TUPLE_CLASS); + } + }; + + /** + * metric to keep count of number of tuples emitted on {@link #parsedOutput} + * port + */ + @AutoMetric + long parsedOutputCount; + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + parsedOutputCount = 0; + } + + @Override + public void setup(Context.OperatorContext context) + { + objMapper = new ObjectMapper(); + encoding = encoding != null ? encoding : CharEncoding.UTF_8; + setupLog(); + } + + @Override + public void processTuple(byte[] inputTuple) + { + if (inputTuple == null) { + this.emitError(null, "null tuple"); + return; + } + String incomingString = ""; + try { + incomingString = new String(inputTuple, encoding); + if (StringUtils.isBlank(incomingString)) { + this.emitError(incomingString, "Blank tuple"); + return; + } + logger.debug("Input string {} ", incomingString); + logger.debug("Parsing with log format {}", this.geLogFileFormat()); + if (this.logSchemaDetails != null && clazz != null) { + if (parsedOutput.isConnected()) { + parsedOutput.emit(objMapper.readValue(this.logSchemaDetails.createJsonFromLog(incomingString).toString().getBytes(), clazz)); + parsedOutputCount++; + } + } + } catch (NullPointerException | IOException | JSONException e) { + this.emitError(incomingString, e.getMessage()); + logger.error("Failed to parse log tuple {}, Exception = {} ", inputTuple, e); + } + } + + /** + * Emits error on error port + * @param tuple + * @param errorMsg + */ + public void emitError(String tuple, String errorMsg) + { + if (err.isConnected()) { + err.emit(new KeyValPair<String, String>(tuple, errorMsg)); + } + errorTupleCount++; + } + + /** + * Setup for the logs according to the logFileFormat + */ + public void setupLog() + { + try { + //parse the schema from logFileFormat string + this.logSchemaDetails = new LogSchemaDetails(logFileFormat); + } catch (IllegalArgumentException e) { + logger.error("Error while initializing the custom log format " + e.getMessage()); + } + } + + /** + * Set log file format required for parsing the log + * @param logFileFormat + */ + public void setLogFileFormat(String logFileFormat) + { + this.logFileFormat = logFileFormat; + } + + /** + * Get log file format required for parsing the log + * @return logFileFormat + */ + public String geLogFileFormat() + { + return logFileFormat; + } + + /** + * Get encoding parameter for converting tuple into String + * @return logSchemaDetails + */ + public String getEncoding() + { + return encoding; + } + + /** + * Set encoding parameter for converting tuple into String + * @param encoding + */ + public void setEncoding(String encoding) + { + this.encoding = encoding; + } + + /** + * Get log schema details (field, regex etc) + * @return logSchemaDetails + */ + public LogSchemaDetails getLogSchemaDetails() { + return logSchemaDetails; + } + + /** + * Set log schema details like (fields and regex) + * @param logSchemaDetails + */ + public void setLogSchemaDetails(LogSchemaDetails logSchemaDetails) { + this.logSchemaDetails = logSchemaDetails; + } + + /** + * Get the class that needs to be formatted + * @return Class<?> + */ + public Class<?> getClazz() + { + return clazz; + } + + /** + * Set the class of tuple that needs to be formatted + * @param clazz + */ + public void setClazz(Class<?> clazz) + { + this.clazz = clazz; + } + + private static final Logger logger = LoggerFactory.getLogger(LogParser.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/21aa3d6c/contrib/src/main/java/com/datatorrent/contrib/parser/LogSchemaDetails.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/LogSchemaDetails.java b/contrib/src/main/java/com/datatorrent/contrib/parser/LogSchemaDetails.java new file mode 100644 index 0000000..2cd9cb4 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/LogSchemaDetails.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.parser; + +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * <p> + * This is schema that defines fields and their regex + * The operators use this information to validate the incoming tuples. + * Information from JSON schema is saved in this object and is used by the + * operators + * <p> + * <br> + * <br> + * Example schema <br> + * <br> + * {@code{ "fields": [{"field": "host","regex": "^([0-9.]+)"}, + * {"field": "userName","regex": "(.*?)"}, + * {"field": "request","regex": "\"((?:[^\"]|\")+)\""}, + * {"field": "statusCode","regex": "(\\d{3})"}, + * {"field": "bytes","regex": "(\\d+|-)"}]} + */ +public class LogSchemaDetails +{ + /** + * This holds the list of field names in the same order as in the schema + */ + private List<String> fieldNames = new LinkedList(); + + private List<Field> fields = new LinkedList(); + + private Pattern compiledPattern = null; + + /** + * This holds regex pattern for the schema + */ + private String pattern; + + public LogSchemaDetails(String json) + { + try { + initialize(json); + createPattern(); + this.compiledPattern = Pattern.compile(this.pattern); + } catch (JSONException | IOException e) { + logger.error("{}", e); + throw new IllegalArgumentException(e); + } + } + + /** + * For a given json string, this method sets the field members + * @param json + * @throws JSONException + * @throws IOException + */ + private void initialize(String json) throws JSONException, IOException + { + JSONObject jsonObject = new JSONObject(json); + JSONArray fieldArray = jsonObject.getJSONArray("fields"); + + for(int i = 0; i < fieldArray.length(); i++) { + JSONObject obj = fieldArray.getJSONObject(i); + Field field = new Field(obj.getString("field"), obj.getString("regex")); + this.fields.add(field); + this.fieldNames.add(field.name); + } + } + + /** + * creates regex group pattern from the regex given for each field + */ + public void createPattern() + { + StringBuffer pattern = new StringBuffer(); + for(Field field: this.getFields()) { + pattern.append(field.getRegex()).append(" "); + } + logger.info("Created pattern for parsing the log {}", pattern.toString().trim()); + this.setPattern(pattern.toString().trim()); + } + + /** + * creates json object by matching the log with given pattern + * @param log + * @return logObject + * @throws Exception + */ + public JSONObject createJsonFromLog(String log) throws JSONException + { + JSONObject logObject = null; + if (this.compiledPattern != null) { + Matcher m = this.compiledPattern.matcher(log); + int count = m.groupCount(); + if (m.find()) { + int i = 1; + logObject = new JSONObject(); + for(String field: this.getFieldNames()) { + if (i > count) { + break; + } + logObject.put(field, m.group(i)); + i++; + } + } + } + return logObject; + } + + /** + * Get the list of fieldNames mentioned in schema + * @return fieldNames + */ + public List<String> getFieldNames() + { + return fieldNames; + } + + /** + * Get the list of fields (field, regex) mentioned in schema + * @return fields + */ + public List<Field> getFields() + { + return fields; + } + + /** + * Get the regex pattern for the schema + * @return pattern + */ + public String getPattern() + { + return pattern; + } + + /** + * Set the regex pattern for schema + * @param pattern + */ + public void setPattern(String pattern) + { + this.pattern = pattern; + } + + public class Field + { + /** + * name of the field + */ + private String name; + /** + * regular expression for the field + */ + private String regex; + + + public Field(String name, String regex) + { + this.name = name; + this.regex = regex; + } + + /** + * Get the name of the field + * @return name + */ + public String getName() + { + return name; + } + + /** + * Set the name of the field + * @param name + */ + public void setName(String name) + { + this.name = name; + } + + /** + * Get the regular expression of the field + * @return regex + */ + public String getRegex() + { + return regex; + } + + /** + * Set the regular expression of the field + * @param regex + */ + public void setRegex(String regex) + { + this.regex = regex; + } + + @Override + public String toString() + { + return "Fields [name=" + name + ", regex=" + regex +"]"; + } + } + + private static final Logger logger = LoggerFactory.getLogger(LogSchemaDetails.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/21aa3d6c/contrib/src/test/java/com/datatorrent/contrib/parser/LogParserTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/LogParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/LogParserTest.java new file mode 100644 index 0000000..53ae033 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/parser/LogParserTest.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.parser; + +import org.codehaus.jettison.json.JSONException; +import org.jooq.exception.IOException; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.lib.appdata.schemas.SchemaUtils; +import com.datatorrent.lib.testbench.CollectorTestSink; + +public class LogParserTest +{ + private String filename = "logSchema.json"; + + LogParser logParser = new LogParser(); + + private CollectorTestSink<Object> error = new CollectorTestSink<Object>(); + + private CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>(); + + @Rule + public Watcher watcher = new Watcher(); + + public class Watcher extends TestWatcher + { + @Override + protected void starting(Description description) + { + super.starting(description); + logParser.err.setSink(error); + logParser.parsedOutput.setSink(pojoPort); + } + + @Override + protected void finished(Description description) + { + super.finished(description); + error.clear(); + pojoPort.clear(); + logParser.teardown(); + } + } + + @Test + public void TestEmptyInput() + { + String tuple = ""; + logParser.beginWindow(0); + logParser.in.process(tuple.getBytes()); + logParser.endWindow(); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestNullInput() + { + logParser.beginWindow(0); + logParser.in.process(null); + logParser.endWindow(); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestSchemaInput() throws JSONException, java.io.IOException + { + logParser.setLogFileFormat(SchemaUtils.jarResourceFileToString(filename)); + logParser.setup(null); + logParser.setClazz(LogSchema.class); + logParser.setLogSchemaDetails(new LogSchemaDetails(logParser.geLogFileFormat())); + String log = "125.125.125.125 smith 200 1043"; + logParser.beginWindow(0); + logParser.in.process(log.getBytes()); + logParser.endWindow(); + Assert.assertEquals(1, pojoPort.collectedTuples.size()); + Assert.assertEquals(0, error.collectedTuples.size()); + Object obj = pojoPort.collectedTuples.get(0); + Assert.assertNotNull(obj); + LogSchema pojo = (LogSchema) obj; + Assert.assertEquals("125.125.125.125", pojo.getHost()); + Assert.assertEquals("smith", pojo.getUserName()); + Assert.assertEquals("200", pojo.getStatusCode()); + Assert.assertEquals("1043", pojo.getBytes()); + } + + @Test + public void TestInvalidSchemaInput() throws JSONException, IOException + { + logParser.setLogFileFormat(SchemaUtils.jarResourceFileToString("invalidLogSchema.json")); + logParser.setup(null); + logParser.setClazz(LogSchema.class); + logParser.setLogSchemaDetails(new LogSchemaDetails(logParser.geLogFileFormat())); + String log = "125.125.125.125 smith 200 1043"; + logParser.beginWindow(0); + logParser.in.process(log.getBytes()); + logParser.endWindow(); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + public static class LogSchema { + private String host; + private String userName; + private String statusCode; + private String bytes; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String username) { + this.userName = username; + } + + public String getStatusCode() { + return statusCode; + } + + public void setStatusCode(String statusCode) { + this.statusCode = statusCode; + } + + public String getBytes() { + return bytes; + } + + public void setBytes(String bytes) { + this.bytes = bytes; + } + + @Override + public String toString() + { + return "LogSchema [host=" + host + ", userName=" + userName + + ", statusCode=" + statusCode + ", bytes=" + bytes + "]"; + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/21aa3d6c/contrib/src/test/resources/invalidLogSchema.json ---------------------------------------------------------------------- diff --git a/contrib/src/test/resources/invalidLogSchema.json b/contrib/src/test/resources/invalidLogSchema.json new file mode 100644 index 0000000..317247b --- /dev/null +++ b/contrib/src/test/resources/invalidLogSchema.json @@ -0,0 +1,15 @@ +{ + "fields": [{ + "field": "host", + "regex": "^([0-9.]+)" + }, { + "field": "userName", + "regex": " " + }, { + "field": "statusCode", + "regex": "(\\d{3})" + }, { + "field": "bytes", + "regex": "(\\d+|-)" + }] +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/21aa3d6c/contrib/src/test/resources/logSchema.json ---------------------------------------------------------------------- diff --git a/contrib/src/test/resources/logSchema.json b/contrib/src/test/resources/logSchema.json new file mode 100644 index 0000000..ded73dd --- /dev/null +++ b/contrib/src/test/resources/logSchema.json @@ -0,0 +1,15 @@ +{ + "fields": [{ + "field": "host", + "regex": "^([0-9.]+)" + }, { + "field": "userName", + "regex": "([a-zA-Z0-9_-]{1,16})" + }, { + "field": "statusCode", + "regex": "(\\d{3})" + }, { + "field": "bytes", + "regex": "(\\d+|-)" + }] +}
