Repository: apex-malhar
Updated Branches:
  refs/heads/master ca6995ca4 -> 52510b0f8


APEXMALHAR-2377-Move LopParser to org.apache.apex.malhar.contrib.parser package


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/12c4bb11
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/12c4bb11
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/12c4bb11

Branch: refs/heads/master
Commit: 12c4bb11af9854c295668ec759dcb7dd49a70838
Parents: 113978f
Author: jogshraddha <jog.shrad...@gmail.com>
Authored: Mon Dec 26 18:18:24 2016 +0530
Committer: jogshraddha <jog.shrad...@gmail.com>
Committed: Mon Dec 26 18:19:45 2016 +0530

----------------------------------------------------------------------
 .../datatorrent/contrib/parser/LogParser.java   | 235 -------------------
 .../contrib/parser/LogSchemaDetails.java        | 234 ------------------
 .../apex/malhar/contrib/parser/LogParser.java   | 235 +++++++++++++++++++
 .../malhar/contrib/parser/LogSchemaDetails.java | 234 ++++++++++++++++++
 .../contrib/parser/LogParserTest.java           | 168 -------------
 .../malhar/contrib/parser/LogParserTest.java    | 171 ++++++++++++++
 6 files changed, 640 insertions(+), 637 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/12c4bb11/contrib/src/main/java/com/datatorrent/contrib/parser/LogParser.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/parser/LogParser.java 
b/contrib/src/main/java/com/datatorrent/contrib/parser/LogParser.java
deleted file mode 100644
index a1f1290..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/LogParser.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.parser;
-
-import java.io.IOException;
-import javax.validation.constraints.NotNull;
-import org.codehaus.jettison.json.JSONException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.commons.lang3.CharEncoding;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.classification.InterfaceStability;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.datatorrent.api.AutoMetric;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.parser.Parser;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * Operator that parses a log string tuple against the
- * a specified json schema and emits POJO on a parsed port and tuples that 
could not be
- * parsed on error port.<br>
- * <b>Properties</b><br>
- * <b>jsonSchema</b>:schema as a string<br>
- * <b>clazz</b>:Pojo class in case of user specified schema<br>
- * <b>Ports</b> <br>
- * <b>in</b>:input tuple as a String. Each tuple represents a log<br>
- * <b>parsedOutput</b>:tuples that are validated against the specified schema 
are emitted
- * as POJO on this port<br>
- * <b>err</b>:tuples that do not confine to log format are emitted on this 
port as
- * KeyValPair<String,String><br>
- * Key being the tuple and Val being the reason.
- */
-@InterfaceStability.Unstable
-public class LogParser extends Parser<byte[], KeyValPair<String, String>>
-{
-  private transient Class<?> clazz;
-
-  @NotNull
-  private String logFileFormat;
-
-  private String encoding;
-
-  private LogSchemaDetails logSchemaDetails;
-
-  private transient ObjectMapper objMapper;
-
-  @Override
-  public Object convert(byte[] tuple)
-  {
-    throw new UnsupportedOperationException("Not supported");
-  }
-
-  @Override
-  public KeyValPair<String, String> processErrorTuple(byte[] bytes)
-  {
-    return null;
-  }
-
-  /**
-   * output port to emit valid records as POJO
-   */
-  public transient DefaultOutputPort<Object> parsedOutput = new 
DefaultOutputPort<Object>()
-  {
-    public void setup(Context.PortContext context)
-    {
-      clazz = context.getValue(Context.PortContext.TUPLE_CLASS);
-    }
-  };
-
-  /**
-   * metric to keep count of number of tuples emitted on {@link #parsedOutput}
-   * port
-   */
-  @AutoMetric
-  long parsedOutputCount;
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-    super.beginWindow(windowId);
-    parsedOutputCount = 0;
-  }
-
-  @Override
-  public void setup(Context.OperatorContext context)
-  {
-    objMapper = new ObjectMapper();
-    encoding = encoding != null ? encoding : CharEncoding.UTF_8;
-    setupLog();
-  }
-
-  @Override
-  public void processTuple(byte[] inputTuple)
-  {
-    if (inputTuple == null) {
-      this.emitError(null, "null tuple");
-      return;
-    }
-    String incomingString = "";
-    try {
-      incomingString = new String(inputTuple, encoding);
-      if (StringUtils.isBlank(incomingString)) {
-        this.emitError(incomingString, "Blank tuple");
-        return;
-      }
-      logger.debug("Input string {} ", incomingString);
-      logger.debug("Parsing with log format {}", this.geLogFileFormat());
-      if (this.logSchemaDetails != null && clazz != null) {
-        if (parsedOutput.isConnected()) {
-          
parsedOutput.emit(objMapper.readValue(this.logSchemaDetails.createJsonFromLog(incomingString).toString().getBytes(),
 clazz));
-          parsedOutputCount++;
-        }
-      }
-    } catch (NullPointerException | IOException | JSONException e) {
-      this.emitError(incomingString, e.getMessage());
-      logger.error("Failed to parse log tuple {}, Exception = {} ", 
inputTuple, e);
-    }
-  }
-
-  /**
-   * Emits error on error port
-   * @param tuple
-   * @param errorMsg
-   */
-  public void emitError(String tuple, String errorMsg)
-  {
-    if (err.isConnected()) {
-      err.emit(new KeyValPair<String, String>(tuple, errorMsg));
-    }
-    errorTupleCount++;
-  }
-
-  /**
-   *  Setup for the logs according to the logFileFormat
-   */
-  public void setupLog()
-  {
-    try {
-      //parse the schema from logFileFormat string
-      this.logSchemaDetails = new LogSchemaDetails(logFileFormat);
-    } catch (IllegalArgumentException e) {
-      logger.error("Error while initializing the custom log format " + 
e.getMessage());
-    }
-  }
-
-  /**
-   * Set log file format required for parsing the log
-   * @param logFileFormat
-   */
-  public void setLogFileFormat(String logFileFormat)
-  {
-    this.logFileFormat = logFileFormat;
-  }
-
-  /**
-   * Get log file format required for parsing the log
-   * @return logFileFormat
-   */
-  public String geLogFileFormat()
-  {
-    return logFileFormat;
-  }
-
-  /**
-   * Get encoding parameter for converting tuple into String
-   * @return logSchemaDetails
-   */
-  public String getEncoding()
-  {
-    return encoding;
-  }
-
-  /**
-   * Set encoding parameter for converting tuple into String
-   * @param encoding
-   */
-  public void setEncoding(String encoding)
-  {
-    this.encoding = encoding;
-  }
-
-  /**
-   * Get log schema details (field, regex etc)
-   * @return logSchemaDetails
-   */
-  public LogSchemaDetails getLogSchemaDetails() {
-    return logSchemaDetails;
-  }
-
-  /**
-   * Set log schema details like (fields and regex)
-   * @param logSchemaDetails
-   */
-  public void setLogSchemaDetails(LogSchemaDetails logSchemaDetails) {
-    this.logSchemaDetails = logSchemaDetails;
-  }
-
-  /**
-   * Get the class that needs to be formatted
-   * @return Class<?>
-   */
-  public Class<?> getClazz()
-  {
-    return clazz;
-  }
-
-  /**
-   * Set the class of tuple that needs to be formatted
-   * @param clazz
-   */
-  public void setClazz(Class<?> clazz)
-  {
-    this.clazz = clazz;
-  }
-
-  private static final Logger logger = 
LoggerFactory.getLogger(LogParser.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/12c4bb11/contrib/src/main/java/com/datatorrent/contrib/parser/LogSchemaDetails.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/parser/LogSchemaDetails.java 
b/contrib/src/main/java/com/datatorrent/contrib/parser/LogSchemaDetails.java
deleted file mode 100644
index 2cd9cb4..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/LogSchemaDetails.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.parser;
-
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * <p>
- * This is schema that defines fields and their regex
- * The operators use this information to validate the incoming tuples.
- * Information from JSON schema is saved in this object and is used by the
- * operators
- * <p>
- * <br>
- * <br>
- * Example schema <br>
- * <br>
- * {@code{ "fields": [{"field": "host","regex": "^([0-9.]+)"},
- * {"field": "userName","regex": "(.*?)"},
- * {"field": "request","regex": "\"((?:[^\"]|\")+)\""},
- * {"field": "statusCode","regex": "(\\d{3})"},
- * {"field": "bytes","regex": "(\\d+|-)"}]}
- */
-public class LogSchemaDetails
-{
-  /**
-   * This holds the list of field names in the same order as in the schema
-   */
-  private List<String> fieldNames = new LinkedList();
-
-  private List<Field> fields = new LinkedList();
-
-  private Pattern compiledPattern = null;
-
-  /**
-   * This holds regex pattern for the schema
-   */
-  private String pattern;
-
-  public LogSchemaDetails(String json)
-  {
-    try {
-      initialize(json);
-      createPattern();
-      this.compiledPattern = Pattern.compile(this.pattern);
-    } catch (JSONException | IOException e) {
-      logger.error("{}", e);
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  /**
-   * For a given json string, this method sets the field members
-   * @param json
-   * @throws JSONException
-   * @throws IOException
-   */
-  private void initialize(String json) throws JSONException, IOException
-  {
-    JSONObject jsonObject = new JSONObject(json);
-    JSONArray fieldArray = jsonObject.getJSONArray("fields");
-
-    for(int i = 0; i < fieldArray.length(); i++) {
-      JSONObject obj = fieldArray.getJSONObject(i);
-      Field field = new Field(obj.getString("field"), obj.getString("regex"));
-      this.fields.add(field);
-      this.fieldNames.add(field.name);
-    }
-  }
-
-  /**
-   * creates regex group pattern from the regex given for each field
-   */
-  public void createPattern()
-  {
-    StringBuffer pattern = new StringBuffer();
-    for(Field field: this.getFields()) {
-      pattern.append(field.getRegex()).append(" ");
-    }
-    logger.info("Created pattern for parsing the log {}", 
pattern.toString().trim());
-    this.setPattern(pattern.toString().trim());
-  }
-
-  /**
-   * creates json object by matching the log with given pattern
-   * @param log
-   * @return logObject
-   * @throws Exception
-   */
-  public JSONObject createJsonFromLog(String log) throws JSONException
-  {
-    JSONObject logObject = null;
-    if (this.compiledPattern != null) {
-      Matcher m = this.compiledPattern.matcher(log);
-      int count = m.groupCount();
-      if (m.find()) {
-        int i = 1;
-        logObject  = new JSONObject();
-        for(String field: this.getFieldNames()) {
-          if (i > count) {
-            break;
-          }
-          logObject.put(field, m.group(i));
-          i++;
-        }
-      }
-    }
-    return logObject;
-  }
-
-  /**
-   * Get the list of fieldNames mentioned in schema
-   * @return fieldNames
-   */
-  public List<String> getFieldNames()
-  {
-    return fieldNames;
-  }
-
-  /**
-   * Get the list of fields (field, regex) mentioned in schema
-   * @return fields
-   */
-  public List<Field> getFields()
-  {
-    return fields;
-  }
-
-  /**
-   * Get the regex pattern for the schema
-   * @return pattern
-   */
-  public String getPattern()
-  {
-    return pattern;
-  }
-
-  /**
-   * Set the regex pattern for schema
-   * @param pattern
-   */
-  public void setPattern(String pattern)
-  {
-    this.pattern = pattern;
-  }
-
-  public class Field
-  {
-    /**
-     * name of the field
-     */
-    private String name;
-    /**
-     * regular expression for the field
-     */
-    private String regex;
-
-
-    public Field(String name, String regex)
-    {
-      this.name = name;
-      this.regex = regex;
-    }
-
-    /**
-     * Get the name of the field
-     * @return name
-     */
-    public String getName()
-    {
-      return name;
-    }
-
-    /**
-     * Set the name of the field
-     * @param name
-     */
-    public void setName(String name)
-    {
-      this.name = name;
-    }
-
-    /**
-     * Get the regular expression of the field
-     * @return regex
-     */
-    public String getRegex()
-    {
-      return regex;
-    }
-
-    /**
-     * Set the regular expression of the field
-     * @param regex
-     */
-    public void setRegex(String regex)
-    {
-      this.regex = regex;
-    }
-
-    @Override
-    public String toString()
-    {
-      return "Fields [name=" + name + ", regex=" + regex +"]";
-    }
-  }
-
-  private static final Logger logger = 
LoggerFactory.getLogger(LogSchemaDetails.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/12c4bb11/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogParser.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogParser.java 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogParser.java
new file mode 100644
index 0000000..7a4e906
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogParser.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.parser;
+
+import java.io.IOException;
+import javax.validation.constraints.NotNull;
+import org.codehaus.jettison.json.JSONException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.CharEncoding;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Operator that parses a log string tuple against the
+ * a specified json schema and emits POJO on a parsed port and tuples that 
could not be
+ * parsed on error port.<br>
+ * <b>Properties</b><br>
+ * <b>jsonSchema</b>:schema as a string<br>
+ * <b>clazz</b>:Pojo class in case of user specified schema<br>
+ * <b>Ports</b> <br>
+ * <b>in</b>:input tuple as a String. Each tuple represents a log<br>
+ * <b>parsedOutput</b>:tuples that are validated against the specified schema 
are emitted
+ * as POJO on this port<br>
+ * <b>err</b>:tuples that do not confine to log format are emitted on this 
port as
+ * KeyValPair<String,String><br>
+ * Key being the tuple and Val being the reason.
+ */
+@InterfaceStability.Unstable
+public class LogParser extends Parser<byte[], KeyValPair<String, String>>
+{
+  private transient Class<?> clazz;
+
+  @NotNull
+  private String logFileFormat;
+
+  private String encoding;
+
+  private LogSchemaDetails logSchemaDetails;
+
+  private transient ObjectMapper objMapper;
+
+  @Override
+  public Object convert(byte[] tuple)
+  {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public KeyValPair<String, String> processErrorTuple(byte[] bytes)
+  {
+    return null;
+  }
+
+  /**
+   * output port to emit valid records as POJO
+   */
+  public transient DefaultOutputPort<Object> parsedOutput = new 
DefaultOutputPort<Object>()
+  {
+    public void setup(Context.PortContext context)
+    {
+      clazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
+  };
+
+  /**
+   * metric to keep count of number of tuples emitted on {@link #parsedOutput}
+   * port
+   */
+  @AutoMetric
+  long parsedOutputCount;
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    parsedOutputCount = 0;
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    objMapper = new ObjectMapper();
+    encoding = encoding != null ? encoding : CharEncoding.UTF_8;
+    setupLog();
+  }
+
+  @Override
+  public void processTuple(byte[] inputTuple)
+  {
+    if (inputTuple == null) {
+      this.emitError(null, "null tuple");
+      return;
+    }
+    String incomingString = "";
+    try {
+      incomingString = new String(inputTuple, encoding);
+      if (StringUtils.isBlank(incomingString)) {
+        this.emitError(incomingString, "Blank tuple");
+        return;
+      }
+      logger.debug("Input string {} ", incomingString);
+      logger.debug("Parsing with log format {}", this.geLogFileFormat());
+      if (this.logSchemaDetails != null && clazz != null) {
+        if (parsedOutput.isConnected()) {
+          
parsedOutput.emit(objMapper.readValue(this.logSchemaDetails.createJsonFromLog(incomingString).toString().getBytes(),
 clazz));
+          parsedOutputCount++;
+        }
+      }
+    } catch (NullPointerException | IOException | JSONException e) {
+      this.emitError(incomingString, e.getMessage());
+      logger.error("Failed to parse log tuple {}, Exception = {} ", 
inputTuple, e);
+    }
+  }
+
+  /**
+   * Emits error on error port
+   * @param tuple
+   * @param errorMsg
+   */
+  public void emitError(String tuple, String errorMsg)
+  {
+    if (err.isConnected()) {
+      err.emit(new KeyValPair<String, String>(tuple, errorMsg));
+    }
+    errorTupleCount++;
+  }
+
+  /**
+   *  Setup for the logs according to the logFileFormat
+   */
+  public void setupLog()
+  {
+    try {
+      //parse the schema from logFileFormat string
+      this.logSchemaDetails = new LogSchemaDetails(logFileFormat);
+    } catch (IllegalArgumentException e) {
+      logger.error("Error while initializing the custom log format " + 
e.getMessage());
+    }
+  }
+
+  /**
+   * Set log file format required for parsing the log
+   * @param logFileFormat
+   */
+  public void setLogFileFormat(String logFileFormat)
+  {
+    this.logFileFormat = logFileFormat;
+  }
+
+  /**
+   * Get log file format required for parsing the log
+   * @return logFileFormat
+   */
+  public String geLogFileFormat()
+  {
+    return logFileFormat;
+  }
+
+  /**
+   * Get encoding parameter for converting tuple into String
+   * @return logSchemaDetails
+   */
+  public String getEncoding()
+  {
+    return encoding;
+  }
+
+  /**
+   * Set encoding parameter for converting tuple into String
+   * @param encoding
+   */
+  public void setEncoding(String encoding)
+  {
+    this.encoding = encoding;
+  }
+
+  /**
+   * Get log schema details (field, regex etc)
+   * @return logSchemaDetails
+   */
+  public LogSchemaDetails getLogSchemaDetails() {
+    return logSchemaDetails;
+  }
+
+  /**
+   * Set log schema details like (fields and regex)
+   * @param logSchemaDetails
+   */
+  public void setLogSchemaDetails(LogSchemaDetails logSchemaDetails) {
+    this.logSchemaDetails = logSchemaDetails;
+  }
+
+  /**
+   * Get the class that needs to be formatted
+   * @return Class<?>
+   */
+  public Class<?> getClazz()
+  {
+    return clazz;
+  }
+
+  /**
+   * Set the class of tuple that needs to be formatted
+   * @param clazz
+   */
+  public void setClazz(Class<?> clazz)
+  {
+    this.clazz = clazz;
+  }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(LogParser.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/12c4bb11/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogSchemaDetails.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogSchemaDetails.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogSchemaDetails.java
new file mode 100644
index 0000000..8120e00
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/LogSchemaDetails.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.parser;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * <p>
+ * This is schema that defines fields and their regex
+ * The operators use this information to validate the incoming tuples.
+ * Information from JSON schema is saved in this object and is used by the
+ * operators
+ * <p>
+ * <br>
+ * <br>
+ * Example schema <br>
+ * <br>
+ * {@code{ "fields": [{"field": "host","regex": "^([0-9.]+)"},
+ * {"field": "userName","regex": "(.*?)"},
+ * {"field": "request","regex": "\"((?:[^\"]|\")+)\""},
+ * {"field": "statusCode","regex": "(\\d{3})"},
+ * {"field": "bytes","regex": "(\\d+|-)"}]}
+ */
+public class LogSchemaDetails
+{
+  /**
+   * This holds the list of field names in the same order as in the schema
+   */
+  private List<String> fieldNames = new LinkedList();
+
+  private List<Field> fields = new LinkedList();
+
+  private Pattern compiledPattern = null;
+
+  /**
+   * This holds regex pattern for the schema
+   */
+  private String pattern;
+
+  public LogSchemaDetails(String json)
+  {
+    try {
+      initialize(json);
+      createPattern();
+      this.compiledPattern = Pattern.compile(this.pattern);
+    } catch (JSONException | IOException e) {
+      logger.error("{}", e);
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  /**
+   * For a given json string, this method sets the field members
+   * @param json
+   * @throws JSONException
+   * @throws IOException
+   */
+  private void initialize(String json) throws JSONException, IOException
+  {
+    JSONObject jsonObject = new JSONObject(json);
+    JSONArray fieldArray = jsonObject.getJSONArray("fields");
+
+    for(int i = 0; i < fieldArray.length(); i++) {
+      JSONObject obj = fieldArray.getJSONObject(i);
+      Field field = new Field(obj.getString("field"), obj.getString("regex"));
+      this.fields.add(field);
+      this.fieldNames.add(field.name);
+    }
+  }
+
+  /**
+   * creates regex group pattern from the regex given for each field
+   */
+  public void createPattern()
+  {
+    StringBuffer pattern = new StringBuffer();
+    for(Field field: this.getFields()) {
+      pattern.append(field.getRegex()).append(" ");
+    }
+    logger.info("Created pattern for parsing the log {}", 
pattern.toString().trim());
+    this.setPattern(pattern.toString().trim());
+  }
+
+  /**
+   * creates json object by matching the log with given pattern
+   * @param log
+   * @return logObject
+   * @throws Exception
+   */
+  public JSONObject createJsonFromLog(String log) throws JSONException
+  {
+    JSONObject logObject = null;
+    if (this.compiledPattern != null) {
+      Matcher m = this.compiledPattern.matcher(log);
+      int count = m.groupCount();
+      if (m.find()) {
+        int i = 1;
+        logObject  = new JSONObject();
+        for(String field: this.getFieldNames()) {
+          if (i > count) {
+            break;
+          }
+          logObject.put(field, m.group(i));
+          i++;
+        }
+      }
+    }
+    return logObject;
+  }
+
+  /**
+   * Get the list of fieldNames mentioned in schema
+   * @return fieldNames
+   */
+  public List<String> getFieldNames()
+  {
+    return fieldNames;
+  }
+
+  /**
+   * Get the list of fields (field, regex) mentioned in schema
+   * @return fields
+   */
+  public List<Field> getFields()
+  {
+    return fields;
+  }
+
+  /**
+   * Get the regex pattern for the schema
+   * @return pattern
+   */
+  public String getPattern()
+  {
+    return pattern;
+  }
+
+  /**
+   * Set the regex pattern for schema
+   * @param pattern
+   */
+  public void setPattern(String pattern)
+  {
+    this.pattern = pattern;
+  }
+
+  public class Field
+  {
+    /**
+     * name of the field
+     */
+    private String name;
+    /**
+     * regular expression for the field
+     */
+    private String regex;
+
+
+    public Field(String name, String regex)
+    {
+      this.name = name;
+      this.regex = regex;
+    }
+
+    /**
+     * Get the name of the field
+     * @return name
+     */
+    public String getName()
+    {
+      return name;
+    }
+
+    /**
+     * Set the name of the field
+     * @param name
+     */
+    public void setName(String name)
+    {
+      this.name = name;
+    }
+
+    /**
+     * Get the regular expression of the field
+     * @return regex
+     */
+    public String getRegex()
+    {
+      return regex;
+    }
+
+    /**
+     * Set the regular expression of the field
+     * @param regex
+     */
+    public void setRegex(String regex)
+    {
+      this.regex = regex;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Fields [name=" + name + ", regex=" + regex +"]";
+    }
+  }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(LogSchemaDetails.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/12c4bb11/contrib/src/test/java/com/datatorrent/contrib/parser/LogParserTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/parser/LogParserTest.java 
b/contrib/src/test/java/com/datatorrent/contrib/parser/LogParserTest.java
deleted file mode 100644
index 53ae033..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/parser/LogParserTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.parser;
-
-import org.codehaus.jettison.json.JSONException;
-import org.jooq.exception.IOException;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import com.datatorrent.lib.appdata.schemas.SchemaUtils;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-public class LogParserTest
-{
-  private String filename = "logSchema.json";
-
-  LogParser logParser = new LogParser();
-
-  private CollectorTestSink<Object> error = new CollectorTestSink<Object>();
-
-  private CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
-
-  @Rule
-  public Watcher watcher = new Watcher();
-
-  public class Watcher extends TestWatcher
-  {
-    @Override
-    protected void starting(Description description)
-    {
-      super.starting(description);
-      logParser.err.setSink(error);
-      logParser.parsedOutput.setSink(pojoPort);
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-      super.finished(description);
-      error.clear();
-      pojoPort.clear();
-      logParser.teardown();
-    }
-  }
-
-  @Test
-  public void TestEmptyInput()
-  {
-    String tuple = "";
-    logParser.beginWindow(0);
-    logParser.in.process(tuple.getBytes());
-    logParser.endWindow();
-    Assert.assertEquals(0, pojoPort.collectedTuples.size());
-    Assert.assertEquals(1, error.collectedTuples.size());
-  }
-
-  @Test
-  public void TestNullInput()
-  {
-    logParser.beginWindow(0);
-    logParser.in.process(null);
-    logParser.endWindow();
-    Assert.assertEquals(0, pojoPort.collectedTuples.size());
-    Assert.assertEquals(1, error.collectedTuples.size());
-  }
-
-  @Test
-  public void TestSchemaInput() throws JSONException, java.io.IOException
-  {
-    logParser.setLogFileFormat(SchemaUtils.jarResourceFileToString(filename));
-    logParser.setup(null);
-    logParser.setClazz(LogSchema.class);
-    logParser.setLogSchemaDetails(new 
LogSchemaDetails(logParser.geLogFileFormat()));
-    String log = "125.125.125.125 smith 200 1043";
-    logParser.beginWindow(0);
-    logParser.in.process(log.getBytes());
-    logParser.endWindow();
-    Assert.assertEquals(1, pojoPort.collectedTuples.size());
-    Assert.assertEquals(0, error.collectedTuples.size());
-    Object obj = pojoPort.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    LogSchema pojo = (LogSchema) obj;
-    Assert.assertEquals("125.125.125.125", pojo.getHost());
-    Assert.assertEquals("smith", pojo.getUserName());
-    Assert.assertEquals("200", pojo.getStatusCode());
-    Assert.assertEquals("1043", pojo.getBytes());
-  }
-
-  @Test
-  public void TestInvalidSchemaInput() throws JSONException, IOException
-  {
-    
logParser.setLogFileFormat(SchemaUtils.jarResourceFileToString("invalidLogSchema.json"));
-    logParser.setup(null);
-    logParser.setClazz(LogSchema.class);
-    logParser.setLogSchemaDetails(new 
LogSchemaDetails(logParser.geLogFileFormat()));
-    String log = "125.125.125.125 smith 200 1043";
-    logParser.beginWindow(0);
-    logParser.in.process(log.getBytes());
-    logParser.endWindow();
-    Assert.assertEquals(0, pojoPort.collectedTuples.size());
-    Assert.assertEquals(1, error.collectedTuples.size());
-  }
-
-  public static class LogSchema {
-    private String host;
-    private String userName;
-    private String statusCode;
-    private String bytes;
-
-    public String getHost() {
-      return host;
-    }
-
-    public void setHost(String host) {
-      this.host = host;
-    }
-
-    public String getUserName() {
-      return userName;
-    }
-
-    public void setUserName(String username) {
-      this.userName = username;
-    }
-
-    public String getStatusCode() {
-      return statusCode;
-    }
-
-    public void setStatusCode(String statusCode) {
-      this.statusCode = statusCode;
-    }
-
-    public String getBytes() {
-      return bytes;
-    }
-
-    public void setBytes(String bytes) {
-      this.bytes = bytes;
-    }
-
-    @Override
-    public String toString()
-    {
-      return "LogSchema [host=" + host + ", userName=" + userName
-        + ", statusCode=" + statusCode + ", bytes=" + bytes + "]";
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/12c4bb11/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/LogParserTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/LogParserTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/LogParserTest.java
new file mode 100644
index 0000000..996502d
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/LogParserTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.parser;
+
+import org.codehaus.jettison.json.JSONException;
+import org.jooq.exception.IOException;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.apex.malhar.contrib.parser.LogParser;
+import org.apache.apex.malhar.contrib.parser.LogSchemaDetails;
+
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+public class LogParserTest
+{
+  private String filename = "logSchema.json";
+
+  LogParser logParser = new LogParser();
+
+  private CollectorTestSink<Object> error = new CollectorTestSink<Object>();
+
+  private CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
+
+  @Rule
+  public Watcher watcher = new Watcher();
+
+  public class Watcher extends TestWatcher
+  {
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      logParser.err.setSink(error);
+      logParser.parsedOutput.setSink(pojoPort);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      super.finished(description);
+      error.clear();
+      pojoPort.clear();
+      logParser.teardown();
+    }
+  }
+
+  @Test
+  public void TestEmptyInput()
+  {
+    String tuple = "";
+    logParser.beginWindow(0);
+    logParser.in.process(tuple.getBytes());
+    logParser.endWindow();
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestNullInput()
+  {
+    logParser.beginWindow(0);
+    logParser.in.process(null);
+    logParser.endWindow();
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestSchemaInput() throws JSONException, java.io.IOException
+  {
+    logParser.setLogFileFormat(SchemaUtils.jarResourceFileToString(filename));
+    logParser.setup(null);
+    logParser.setClazz(LogSchema.class);
+    logParser.setLogSchemaDetails(new 
LogSchemaDetails(logParser.geLogFileFormat()));
+    String log = "125.125.125.125 smith 200 1043";
+    logParser.beginWindow(0);
+    logParser.in.process(log.getBytes());
+    logParser.endWindow();
+    Assert.assertEquals(1, pojoPort.collectedTuples.size());
+    Assert.assertEquals(0, error.collectedTuples.size());
+    Object obj = pojoPort.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    LogSchema pojo = (LogSchema) obj;
+    Assert.assertEquals("125.125.125.125", pojo.getHost());
+    Assert.assertEquals("smith", pojo.getUserName());
+    Assert.assertEquals("200", pojo.getStatusCode());
+    Assert.assertEquals("1043", pojo.getBytes());
+  }
+
+  @Test
+  public void TestInvalidSchemaInput() throws JSONException, IOException
+  {
+    
logParser.setLogFileFormat(SchemaUtils.jarResourceFileToString("invalidLogSchema.json"));
+    logParser.setup(null);
+    logParser.setClazz(LogSchema.class);
+    logParser.setLogSchemaDetails(new 
LogSchemaDetails(logParser.geLogFileFormat()));
+    String log = "125.125.125.125 smith 200 1043";
+    logParser.beginWindow(0);
+    logParser.in.process(log.getBytes());
+    logParser.endWindow();
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  public static class LogSchema {
+    private String host;
+    private String userName;
+    private String statusCode;
+    private String bytes;
+
+    public String getHost() {
+      return host;
+    }
+
+    public void setHost(String host) {
+      this.host = host;
+    }
+
+    public String getUserName() {
+      return userName;
+    }
+
+    public void setUserName(String username) {
+      this.userName = username;
+    }
+
+    public String getStatusCode() {
+      return statusCode;
+    }
+
+    public void setStatusCode(String statusCode) {
+      this.statusCode = statusCode;
+    }
+
+    public String getBytes() {
+      return bytes;
+    }
+
+    public void setBytes(String bytes) {
+      this.bytes = bytes;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "LogSchema [host=" + host + ", userName=" + userName
+        + ", statusCode=" + statusCode + ", bytes=" + bytes + "]";
+    }
+  }
+}

Reply via email to