Repository: apex-malhar Updated Branches: refs/heads/master 89b29378e -> d45e8369c
APEXMALHAR-2376 Add Common Log support in LogParser operator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f29c7d45 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f29c7d45 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f29c7d45 Branch: refs/heads/master Commit: f29c7d45b92cacda80de8bb218d5db8235fb2d68 Parents: 7f1abca Author: akshay <[email protected]> Authored: Tue Jan 31 12:36:38 2017 +0530 Committer: akshay <[email protected]> Committed: Wed Mar 15 11:12:30 2017 +0530 ---------------------------------------------------------------------- .../malhar/contrib/parser/CommonLogParser.java | 84 +++++++++++++ .../malhar/contrib/parser/log/CommonLog.java | 120 ++++++++++++++++++ .../contrib/parser/CommonLogParserTest.java | 124 +++++++++++++++++++ 3 files changed, 328 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f29c7d45/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/CommonLogParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/CommonLogParser.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/CommonLogParser.java new file mode 100644 index 0000000..ec20810 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/CommonLogParser.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.parser; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.contrib.parser.log.CommonLog; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; + +/** + * Operator that parses a log string tuple against the + * a specified json schema and emits POJO on a parsed port and tuples that could not be + * parsed on error port.<br> + * <b>Properties</b><br> + * <b>jsonSchema</b>:schema as a string<br> + * <b>pojoClass</b>:Pojo class in case of user specified schema<br> + * <b>Ports</b> <br> + * <b>in</b>:input tuple as a String. Each tuple represents a log<br> + * <b>parsedOutput</b>:tuples that are validated against the specified schema are emitted + * as POJO on this port<br> + * <b>err</b>:tuples that do not confine to log format are emitted on this port as + * KeyValPair<String,String><br> + * Key being the tuple and Val being the reason. + */ [email protected] +public class CommonLogParser extends LogParser +{ + private static final Logger logger = LoggerFactory.getLogger(CommonLogParser.class); + + private String schema="{\n" + + " \"fields\": [{\n" + + " \"field\": \"host\",\n" + + " \"regex\": \"^([0-9.]+)\"\n" + + " }, {\n" + + " \"field\": \"rfc931\",\n" + + " \"regex\": \"(\\\\S+)\"\n" + + " }, {\n" + + " \"field\": \"username\",\n" + + " \"regex\": \"(\\\\S+)\"\n" + + " }, {\n" + + " \"field\": \"datetime\",\n" + + " \"regex\": \"\\\\[(.*?)\\\\]\"\n" + + " },{\n" + + " \"field\": \"request\",\n" + + " \"regex\": \"\\\"((?:[^\\\"]|\\\")+)\\\"\"\n" + + " },{\n" + + " \"field\": \"statusCode\",\n" + + " \"regex\": \"(\\\\d{3})\"\n" + + " },{\n" + + " \"field\": \"bytes\",\n" + + " \"regex\": \"(\\\\d+|-)\"\n" + + " }]\n" + + "}"; + + + @Override + public void setup(Context.OperatorContext context) + { + this.setLogFileFormat(schema); + super.setup(context); + super.setClazz(CommonLog.class); + } + + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f29c7d45/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/log/CommonLog.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/log/CommonLog.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/log/CommonLog.java new file mode 100644 index 0000000..aa44a76 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/log/CommonLog.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.parser.log; + +import org.apache.apex.malhar.contrib.parser.LogParser; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This is default log format parser for <a href="https://en.wikipedia.org/wiki/Common_Log_Format">Common log </a> + * To use this format with {@link LogParser} operator just mention the property "logFileFormat" as "COMMON" + */ [email protected] +public class CommonLog +{ + private String host; + private String rfc931; + private String username; + private String datetime; + private String request; + private String statusCode; + private String bytes; + + @Override + public String toString() + { + return "CommonLog [ Host : " + this.getHost() + + ", rfc931 : " + this.getRfc931() + + ", userName : " + this.getUsername() + + ", dateTime : " + this.getDatetime() + + ", request : " + this.getRequest() + + ", statusCode : " + this.getStatusCode() + + ", bytes : " + this.getBytes() + " ]"; + } + + public String getHost() + { + return host; + } + + public void setHost(String host) + { + this.host = host; + } + + public String getRfc931() + { + return rfc931; + } + + public void setRfc931(String rfc931) + { + this.rfc931 = rfc931; + } + + public String getUsername() + { + return username; + } + + public void setUsername(String username) + { + this.username = username; + } + + public String getDatetime() + { + return datetime; + } + + public void setDatetime(String datetime) + { + this.datetime = datetime; + } + + public String getRequest() + { + return request; + } + + public void setRequest(String request) + { + this.request = request; + } + + public String getStatusCode() + { + return statusCode; + } + + public void setStatusCode(String statusCode) + { + this.statusCode = statusCode; + } + + public String getBytes() + { + return bytes; + } + + public void setBytes(String bytes) + { + this.bytes = bytes; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f29c7d45/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/CommonLogParserTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/CommonLogParserTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/CommonLogParserTest.java new file mode 100644 index 0000000..ec529fe --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/CommonLogParserTest.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.parser; + +import java.io.IOException; + +import org.codehaus.jettison.json.JSONException; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import org.apache.apex.malhar.contrib.parser.log.CommonLog; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +public class CommonLogParserTest +{ + + CommonLogParser commonLogParser = new CommonLogParser(); + + private CollectorTestSink<Object> error = new CollectorTestSink<Object>(); + + private CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>(); + + @Rule + public Watcher watcher = new Watcher(); + + public class Watcher extends TestWatcher + { + @Override + protected void starting(Description description) + { + super.starting(description); + commonLogParser.err.setSink(error); + commonLogParser.parsedOutput.setSink(pojoPort); + } + + @Override + protected void finished(Description description) + { + super.finished(description); + error.clear(); + pojoPort.clear(); + commonLogParser.teardown(); + } + } + + @Test + public void TestEmptyInput() + { + String tuple = ""; + commonLogParser.setup(null); + commonLogParser.beginWindow(0); + commonLogParser.in.process(tuple.getBytes()); + commonLogParser.endWindow(); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestNullInput() + { + commonLogParser.setup(null); + commonLogParser.in.process(null); + commonLogParser.endWindow(); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void TestValidCommonLogInputCase() throws JSONException, IOException + { + + commonLogParser.setup(null); + commonLogParser.beginWindow(0); + String log = "125.125.125.125 - dsmith [10/Oct/1999:21:15:05 +0500] \"GET /index.html HTTP/1.0\" 200 1043"; + commonLogParser.in.process(log.getBytes()); + commonLogParser.endWindow(); + Assert.assertEquals(1, pojoPort.collectedTuples.size()); + Assert.assertEquals(0, error.collectedTuples.size()); + Object obj = pojoPort.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(CommonLog.class, obj.getClass()); + CommonLog pojo = (CommonLog)obj; + Assert.assertNotNull(obj); + Assert.assertEquals("125.125.125.125", pojo.getHost()); + Assert.assertEquals("dsmith", pojo.getUsername()); + Assert.assertEquals("10/Oct/1999:21:15:05 +0500", pojo.getDatetime()); + Assert.assertEquals("GET /index.html HTTP/1.0", pojo.getRequest()); + Assert.assertEquals("200", pojo.getStatusCode()); + Assert.assertEquals("1043", pojo.getBytes()); + } + + @Test + public void TestInvalidCommonLogInput() + { + String tuple = "127.0.0.1 - dsmith 10/Oct/1999:21:15:05] \"GET /index.html HTTP/1.0\" 200 1043"; + commonLogParser.setup(null); + commonLogParser.beginWindow(0); + commonLogParser.in.process(tuple.getBytes()); + commonLogParser.endWindow(); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + +}
