Repository: apex-malhar
Updated Branches:
  refs/heads/master 89b29378e -> d45e8369c


APEXMALHAR-2376 Add Common Log support in LogParser operator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f29c7d45
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f29c7d45
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f29c7d45

Branch: refs/heads/master
Commit: f29c7d45b92cacda80de8bb218d5db8235fb2d68
Parents: 7f1abca
Author: akshay <akshay.har...@synerzip.com>
Authored: Tue Jan 31 12:36:38 2017 +0530
Committer: akshay <akshay.har...@synerzip.com>
Committed: Wed Mar 15 11:12:30 2017 +0530

----------------------------------------------------------------------
 .../malhar/contrib/parser/CommonLogParser.java  |  84 +++++++++++++
 .../malhar/contrib/parser/log/CommonLog.java    | 120 ++++++++++++++++++
 .../contrib/parser/CommonLogParserTest.java     | 124 +++++++++++++++++++
 3 files changed, 328 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f29c7d45/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/CommonLogParser.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/CommonLogParser.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/CommonLogParser.java
new file mode 100644
index 0000000..ec20810
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/CommonLogParser.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.parser;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.contrib.parser.log.CommonLog;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+
+/**
+ * Operator that parses a log string tuple against the
+ * a specified json schema and emits POJO on a parsed port and tuples that 
could not be
+ * parsed on error port.<br>
+ * <b>Properties</b><br>
+ * <b>jsonSchema</b>:schema as a string<br>
+ * <b>pojoClass</b>:Pojo class in case of user specified schema<br>
+ * <b>Ports</b> <br>
+ * <b>in</b>:input tuple as a String. Each tuple represents a log<br>
+ * <b>parsedOutput</b>:tuples that are validated against the specified schema 
are emitted
+ * as POJO on this port<br>
+ * <b>err</b>:tuples that do not confine to log format are emitted on this 
port as
+ * KeyValPair<String,String><br>
+ * Key being the tuple and Val being the reason.
+ */
+@InterfaceStability.Evolving
+public class CommonLogParser extends LogParser
+{
+  private static final Logger logger = 
LoggerFactory.getLogger(CommonLogParser.class);
+
+  private String schema="{\n" +
+    "  \"fields\": [{\n" +
+    "    \"field\": \"host\",\n" +
+    "    \"regex\": \"^([0-9.]+)\"\n" +
+    "  }, {\n" +
+    "    \"field\": \"rfc931\",\n" +
+    "    \"regex\": \"(\\\\S+)\"\n" +
+    "  }, {\n" +
+    "    \"field\": \"username\",\n" +
+    "    \"regex\": \"(\\\\S+)\"\n" +
+    "  }, {\n" +
+    "    \"field\": \"datetime\",\n" +
+    "    \"regex\": \"\\\\[(.*?)\\\\]\"\n" +
+    "  },{\n" +
+    "    \"field\": \"request\",\n" +
+    "    \"regex\": \"\\\"((?:[^\\\"]|\\\")+)\\\"\"\n" +
+    "  },{\n" +
+    "    \"field\": \"statusCode\",\n" +
+    "    \"regex\": \"(\\\\d{3})\"\n" +
+    "  },{\n" +
+    "    \"field\": \"bytes\",\n" +
+    "    \"regex\": \"(\\\\d+|-)\"\n" +
+    "  }]\n" +
+    "}";
+
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    this.setLogFileFormat(schema);
+    super.setup(context);
+    super.setClazz(CommonLog.class);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f29c7d45/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/log/CommonLog.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/log/CommonLog.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/log/CommonLog.java
new file mode 100644
index 0000000..aa44a76
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/log/CommonLog.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.parser.log;
+
+import org.apache.apex.malhar.contrib.parser.LogParser;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This is default log format parser for <a 
href="https://en.wikipedia.org/wiki/Common_Log_Format";>Common log </a>
+ * To use this format with {@link LogParser} operator just mention the 
property "logFileFormat" as "COMMON"
+ */
+@InterfaceStability.Evolving
+public class CommonLog
+{
+  private String host;
+  private String rfc931;
+  private String username;
+  private String datetime;
+  private String request;
+  private String statusCode;
+  private String bytes;
+
+  @Override
+  public String toString()
+  {
+    return "CommonLog [ Host : " + this.getHost() +
+      ", rfc931 : " + this.getRfc931() +
+      ", userName : " + this.getUsername() +
+      ", dateTime : " + this.getDatetime() +
+      ", request : " + this.getRequest() +
+      ", statusCode : " + this.getStatusCode() +
+      ", bytes : " + this.getBytes() + " ]";
+  }
+
+  public String getHost()
+  {
+    return host;
+  }
+
+  public void setHost(String host)
+  {
+    this.host = host;
+  }
+
+  public String getRfc931()
+  {
+    return rfc931;
+  }
+
+  public void setRfc931(String rfc931)
+  {
+    this.rfc931 = rfc931;
+  }
+
+  public String getUsername()
+  {
+    return username;
+  }
+
+  public void setUsername(String username)
+  {
+    this.username = username;
+  }
+
+  public String getDatetime()
+  {
+    return datetime;
+  }
+
+  public void setDatetime(String datetime)
+  {
+    this.datetime = datetime;
+  }
+
+  public String getRequest()
+  {
+    return request;
+  }
+
+  public void setRequest(String request)
+  {
+    this.request = request;
+  }
+
+  public String getStatusCode()
+  {
+    return statusCode;
+  }
+
+  public void setStatusCode(String statusCode)
+  {
+    this.statusCode = statusCode;
+  }
+
+  public String getBytes()
+  {
+    return bytes;
+  }
+
+  public void setBytes(String bytes)
+  {
+    this.bytes = bytes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f29c7d45/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/CommonLogParserTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/CommonLogParserTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/CommonLogParserTest.java
new file mode 100644
index 0000000..ec529fe
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/CommonLogParserTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.parser;
+
+import java.io.IOException;
+
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.apex.malhar.contrib.parser.log.CommonLog;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+public class CommonLogParserTest
+{
+
+  CommonLogParser commonLogParser = new CommonLogParser();
+
+  private CollectorTestSink<Object> error = new CollectorTestSink<Object>();
+
+  private CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
+
+  @Rule
+  public Watcher watcher = new Watcher();
+
+  public class Watcher extends TestWatcher
+  {
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      commonLogParser.err.setSink(error);
+      commonLogParser.parsedOutput.setSink(pojoPort);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      super.finished(description);
+      error.clear();
+      pojoPort.clear();
+      commonLogParser.teardown();
+    }
+  }
+
+  @Test
+  public void TestEmptyInput()
+  {
+    String tuple = "";
+    commonLogParser.setup(null);
+    commonLogParser.beginWindow(0);
+    commonLogParser.in.process(tuple.getBytes());
+    commonLogParser.endWindow();
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestNullInput()
+  {
+    commonLogParser.setup(null);
+    commonLogParser.in.process(null);
+    commonLogParser.endWindow();
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestValidCommonLogInputCase() throws JSONException, IOException
+  {
+
+    commonLogParser.setup(null);
+    commonLogParser.beginWindow(0);
+    String log = "125.125.125.125 - dsmith [10/Oct/1999:21:15:05 +0500] \"GET 
/index.html HTTP/1.0\" 200 1043";
+    commonLogParser.in.process(log.getBytes());
+    commonLogParser.endWindow();
+    Assert.assertEquals(1, pojoPort.collectedTuples.size());
+    Assert.assertEquals(0, error.collectedTuples.size());
+    Object obj = pojoPort.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(CommonLog.class, obj.getClass());
+    CommonLog pojo = (CommonLog)obj;
+    Assert.assertNotNull(obj);
+    Assert.assertEquals("125.125.125.125", pojo.getHost());
+    Assert.assertEquals("dsmith", pojo.getUsername());
+    Assert.assertEquals("10/Oct/1999:21:15:05 +0500", pojo.getDatetime());
+    Assert.assertEquals("GET /index.html HTTP/1.0", pojo.getRequest());
+    Assert.assertEquals("200", pojo.getStatusCode());
+    Assert.assertEquals("1043", pojo.getBytes());
+  }
+
+  @Test
+  public void TestInvalidCommonLogInput()
+  {
+    String tuple = "127.0.0.1 - dsmith 10/Oct/1999:21:15:05] \"GET /index.html 
HTTP/1.0\" 200 1043";
+    commonLogParser.setup(null);
+    commonLogParser.beginWindow(0);
+    commonLogParser.in.process(tuple.getBytes());
+    commonLogParser.endWindow();
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+}

Reply via email to