Repository: apex-malhar
Updated Branches:
  refs/heads/master 3df6715d7 -> 42b9e2281


APEXMALHAR-2033-StreamingParser JSON streaming parser


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/065bf020
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/065bf020
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/065bf020

Branch: refs/heads/master
Commit: 065bf0207476293cedc9744ef5fa00b09d2435c9
Parents: 3e1cd8f
Author: devtagare <[email protected]>
Authored: Tue May 24 16:52:42 2016 -0700
Committer: devtagare <[email protected]>
Committed: Sun Jun 5 23:02:57 2016 -0700

----------------------------------------------------------------------
 contrib/pom.xml                                 |   6 +
 .../malhar/contrib/parser/JsonKeyFinder.java    | 154 +++++++
 .../contrib/parser/StreamingJsonParser.java     | 426 +++++++++++++++++++
 .../malhar/contrib/parser/package-info.java     |  20 +
 .../contrib/parser/StreamingJsonParserTest.java | 414 ++++++++++++++++++
 5 files changed, 1020 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/065bf020/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index c0ec6c7..cf98feb 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -641,6 +641,12 @@
        </exclusions>
     </dependency>  
     <dependency>
+      <groupId>com.googlecode.json-simple</groupId>
+      <artifactId>json-simple</artifactId>
+      <version>1.1.1</version>
+      <optional>true</optional>
+    </dependency> 
+    <dependency>
       <groupId>com.github.fge</groupId>
       <artifactId>json-schema-validator</artifactId>
       <version>2.0.1</version>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/065bf020/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/JsonKeyFinder.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/JsonKeyFinder.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/JsonKeyFinder.java
new file mode 100644
index 0000000..51aee27
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/JsonKeyFinder.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.parser;
+
+/**
+ * A concrete implementation of Json ContentHandler<br>
+ * Matches JSON keys set from the {@link StreamingJsonParser }
+ * */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.json.simple.parser.ContentHandler;
+
+import com.google.protobuf.TextFormat.ParseException;
+
+public class JsonKeyFinder implements ContentHandler
+{
+  public JsonKeyFinder()
+  {
+    keyValMap = new HashMap<>();
+  }
+
+  public int getKeyCount()
+  {
+    return keyCount;
+  }
+
+  public void setKeyCount(int keyCount)
+  {
+    this.keyCount = keyCount;
+  }
+
+  private Object value;
+  private HashMap<Object, Object> keyValMap;
+  private int keyCount = 0;
+
+  public HashMap<Object, Object> getKeyValMap()
+  {
+    return keyValMap;
+  }
+
+  public void setKeyValMap(HashMap<Object, Object> keyValMap)
+  {
+    this.keyValMap = keyValMap;
+  }
+
+  private boolean found = false;
+  private boolean end = false;
+  private String key;
+
+  private ArrayList<String> matchKeyList;
+
+  public void setMatchKeyList(ArrayList<String> matchKeyList)
+  {
+    this.matchKeyList = matchKeyList;
+  }
+
+  public ArrayList<String> getMatchKeyList()
+  {
+    return matchKeyList;
+  }
+
+  public Object getValue()
+  {
+    return value;
+  }
+
+  public boolean isEnd()
+  {
+    return end;
+  }
+
+  public void setFound(boolean found)
+  {
+    this.found = found;
+  }
+
+  public boolean isFound()
+  {
+    return found;
+  }
+
+  public void startJSON() throws ParseException, IOException
+  {
+    found = false;
+    end = false;
+  }
+
+  public void endJSON() throws ParseException, IOException
+  {
+    end = true;
+  }
+
+  public boolean primitive(Object value) throws ParseException, IOException
+  {
+    if (getMatchKeyList().contains(key)) {
+      found = true;
+      this.value = value;
+      keyValMap.put(key, value);
+      key = null;
+      keyCount++;
+      return false;
+    }
+    return true;
+  }
+
+  public boolean startArray() throws ParseException, IOException
+  {
+    return true;
+  }
+
+  public boolean startObject() throws ParseException, IOException
+  {
+    return true;
+  }
+
+  public boolean startObjectEntry(String key) throws ParseException, 
IOException
+  {
+    this.key = key;
+    return true;
+  }
+
+  public boolean endArray() throws ParseException, IOException
+  {
+    return false;
+  }
+
+  public boolean endObject() throws ParseException, IOException
+  {
+    return true;
+  }
+
+  public boolean endObjectEntry() throws ParseException, IOException
+  {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/065bf020/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java
new file mode 100644
index 0000000..7cc5821
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.parser;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.StringTokenizer;
+
+import org.elasticsearch.common.primitives.Ints;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Operator that parses a JSON string tuple and emits a POJO on the output port
+ * and tuples that could not be parsed on error port.Upstream operator needs to
+ * ensure that a full JSON record is emitted.<br>
+ * <b>Properties</b><br>
+ * <b>pojoClass</b>:POJO class <br>
+ * <b>(optional)fieldMappingString</b>String of format
+ * fieldNameInJson:fieldNameInPOJO:DataType<br>
+ * <b>Ports</b> <br>
+ * <b>in</b>:input tuple as a String. Each tuple represents a json string<br>
+ * <b>out</b>:tuples that are validated as per the user defined POJO are 
emitted
+ * as POJO on this port<br>
+ * <b>err</b>:tuples that could not be parsed are emitted on this port as
+ * KeyValPair<String,String><br>
+ * Key being the tuple and Val being the reason
+ * 
+ * @displayName SimpleStreamingJsonParser
+ * @category Parsers
+ * @tags json pojo parser streaming
+ */
[email protected]
+public class StreamingJsonParser extends Parser<byte[], KeyValPair<String, 
String>>
+{
+  private transient JSONParser jsonParser;
+  private transient String fieldMappingString;
+  private transient List<FieldInfo> fieldInfos;
+  private transient List<ActiveFieldInfo> columnFieldSetters;
+  protected JsonKeyFinder finder;
+  private static final String FIELD_SEPARATOR = ":";
+  private static final String RECORD_SEPARATOR = ",";
+  private transient ArrayList<String> columnFields;
+  private transient Class<?> pojoClass;
+
+  /**
+   * @return POJO class
+   */
+  private Class<?> getPojoClass()
+  {
+    return pojoClass;
+  }
+
+  /**
+   * Sets the POJO class
+   */
+  public void setPojoClass(Class<?> pojoClass)
+  {
+    this.pojoClass = pojoClass;
+  }
+
+  /**
+   * Returns a string representing mapping from generic record to POJO fields
+   */
+  public String getFieldMappingString()
+  {
+    return fieldMappingString;
+  }
+
+  /**
+   * Comma separated list mapping a field in JSON schema to POJO field eg :
+   * fieldNameInPOJO:fieldNameInJSON:DataType
+   */
+  public void setFieldMappingString(String pojoFieldsToJsonMapping)
+  {
+    this.fieldMappingString = pojoFieldsToJsonMapping;
+  }
+
+  public StreamingJsonParser()
+  {
+
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+  }
+
+  @Override
+  public void processTuple(byte[] tuple)
+  {
+    incomingTuplesCount++;
+    Object obj = convert(tuple);
+    if (obj != null) {
+      output.emit(obj);
+      emittedObjectCount++;
+    }
+  }
+
+  /**
+   * Parse an incoming tuple & return a POJO object
+   */
+  @Override
+  public Object convert(byte[] tuple)
+  {
+    String str;
+    if (tuple == null) {
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<String, String>(null, "null tuple"));
+      }
+      errorTupleCount++;
+      return null;
+    }
+
+    try {
+      str = new String(tuple, "UTF-8");
+    } catch (UnsupportedEncodingException e1) {
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<String, String>(tuple.toString(), "Encoding 
not supported"));
+      }
+      errorTupleCount++;
+      LOG.error("Encoding not supported", e1);
+      throw new RuntimeException(e1);
+    }
+
+    try {
+      finder.setKeyCount(0);
+      finder.getKeyValMap().clear();
+      while (!finder.isEnd()) {
+        jsonParser.parse(str, finder, true);
+        //stop parsing when the required keyCount is reached
+        if (finder.getKeyCount() == columnFields.size()) {
+          break;
+        }
+      }
+      jsonParser.reset();
+      return setPojoFields(finder.getKeyValMap());
+    } catch (ParseException | IllegalAccessException | InstantiationException 
e) {
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<String, String>(str, e.getMessage()));
+      }
+      errorTupleCount++;
+      LOG.error("Exception in parsing the record", e);
+      throw new RuntimeException(e);
+    }
+
+  }
+
+  /**
+   * Creates a map representing fieldName in POJO:field in JSON:Data type
+   * 
+   * @return List of FieldInfo
+   */
+  private List<FieldInfo> createFieldInfoMap(String str)
+  {
+    fieldInfos = new ArrayList<FieldInfo>();
+    StringTokenizer strtok = new StringTokenizer(str, RECORD_SEPARATOR);
+
+    while (strtok.hasMoreTokens()) {
+      String[] token = strtok.nextToken().split(FIELD_SEPARATOR);
+      try {
+        fieldInfos.add(new FieldInfo(token[0], token[1], 
SupportType.valueOf(token[2])));
+      } catch (Exception e) {
+        LOG.error("Invalid support type", e);
+      }
+    }
+    return fieldInfos;
+  }
+
+  @Override
+  public KeyValPair<String, String> processErrorTuple(byte[] input)
+  {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  /**
+   * Class that maps fieldInfo to its getters or setters
+   */
+  protected static class ActiveFieldInfo
+  {
+    final FieldInfo fieldInfo;
+    Object setterOrGetter;
+
+    ActiveFieldInfo(FieldInfo fieldInfo)
+    {
+      this.fieldInfo = fieldInfo;
+    }
+
+  }
+
+  /**
+   * A list of {@link FieldInfo}s where each item maps a column name to a pojo
+   * field name.
+   */
+  private List<FieldInfo> getFieldInfos()
+  {
+    return fieldInfos;
+  }
+
+  /**
+   * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a
+   * pojo field name.<br/>
+   * The value from fieldInfo.column is assigned to
+   * fieldInfo.pojoFieldExpression.
+   *
+   * @description $[].columnName name of the Output Field in POJO
+   * @description $[].pojoFieldExpression expression to get the respective 
field
+   *              from generic record
+   * @useSchema $[].pojoFieldExpression outputPort.fields[].name
+   */
+  private void setFieldInfos(List<FieldInfo> fieldInfos)
+  {
+    this.fieldInfos = fieldInfos;
+  }
+
+  /**
+   * Use reflection to generate field info values if the user has not provided
+   * the inputs mapping
+   * 
+   * @return String representing the POJO field to JSON field mapping
+   */
+  private String generateFieldInfoInputs(Class<?> cls)
+  {
+    java.lang.reflect.Field[] fields = cls.getDeclaredFields();
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < fields.length; i++) {
+      java.lang.reflect.Field f = fields[i];
+      Class<?> c = ClassUtils.primitiveToWrapper(f.getType());
+      
sb.append(f.getName()).append(FIELD_SEPARATOR).append(f.getName()).append(FIELD_SEPARATOR)
+          .append(c.getSimpleName().toUpperCase()).append(RECORD_SEPARATOR);
+    }
+    return sb.substring(0, sb.length() - 1);
+  }
+
+  /**
+   * Adds the Active Fields to the columnFieldSetters {@link ActiveFieldInfo}s
+   */
+  private void initColumnFieldSetters(List<FieldInfo> fieldInfos)
+  {
+    for (FieldInfo fi : fieldInfos) {
+      if (columnFieldSetters == null) {
+        columnFieldSetters = Lists.newArrayList();
+      }
+      columnFieldSetters.add(new StreamingJsonParser.ActiveFieldInfo(fi));
+    }
+  }
+
+  /**
+   * Initialize the setters for generating the POJO
+   */
+  private void initializeActiveFieldSetters()
+  {
+    for (int i = 0; i < columnFieldSetters.size(); i++) {
+      ActiveFieldInfo activeFieldInfo = columnFieldSetters.get(i);
+
+      SupportType st = activeFieldInfo.fieldInfo.getType();
+
+      switch (st) {
+        case BOOLEAN:
+          activeFieldInfo.setterOrGetter = 
PojoUtils.createSetterBoolean(getPojoClass(),
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+        case DOUBLE:
+          activeFieldInfo.setterOrGetter = 
PojoUtils.createSetterDouble(getPojoClass(),
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+        case FLOAT:
+          activeFieldInfo.setterOrGetter = 
PojoUtils.createSetterFloat(getPojoClass(),
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+        case INTEGER:
+          activeFieldInfo.setterOrGetter = 
PojoUtils.createSetterInt(getPojoClass(),
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+        case STRING:
+          activeFieldInfo.setterOrGetter = 
PojoUtils.createSetter(getPojoClass(),
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(), 
activeFieldInfo.fieldInfo.getType().getJavaType());
+          break;
+        case LONG:
+          activeFieldInfo.setterOrGetter = 
PojoUtils.createSetterLong(getPojoClass(),
+              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+        default:
+          activeFieldInfo.setterOrGetter = 
PojoUtils.createSetter(getPojoClass(),
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(), Byte.class);
+          break;
+      }
+      columnFieldSetters.get(i).setterOrGetter = 
activeFieldInfo.setterOrGetter;
+    }
+  }
+
+  /**
+   * Returns a POJO from a Generic Record Null is set as the default value if a
+   * key is not found in the parsed JSON
+   * 
+   * @return Object
+   */
+  @SuppressWarnings("unchecked")
+  private Object setPojoFields(HashMap<Object, Object> tuple) throws 
InstantiationException, IllegalAccessException
+  {
+    Object newObj = getPojoClass().newInstance();
+    try {
+      for (int i = 0; i < columnFieldSetters.size(); i++) {
+
+        StreamingJsonParser.ActiveFieldInfo afi = columnFieldSetters.get(i);
+        SupportType st = afi.fieldInfo.getType();
+        Object val = null;
+
+        try {
+          val = tuple.get(afi.fieldInfo.getColumnName());
+        } catch (Exception e) {
+          LOG.error("Could not find field -" + afi.fieldInfo.getColumnName() + 
"- in the generic record", e);
+          val = null;
+        }
+
+        //Nothing to set if a value is null
+        if (val == null) {
+          continue;
+        }
+
+        try {
+          switch (st) {
+            case BOOLEAN:
+              ((PojoUtils.SetterBoolean<Object>)afi.setterOrGetter).set(newObj,
+                  (boolean)tuple.get(afi.fieldInfo.getColumnName()));
+              break;
+            case DOUBLE:
+              ((PojoUtils.SetterDouble<Object>)afi.setterOrGetter).set(newObj,
+                  (double)tuple.get(afi.fieldInfo.getColumnName()));
+              break;
+            case INTEGER:
+              int intVal = 
Ints.checkedCast((long)tuple.get(afi.fieldInfo.getColumnName()));
+              ((PojoUtils.SetterInt<Object>)afi.setterOrGetter).set(newObj, 
intVal);
+              break;
+            case STRING:
+              ((PojoUtils.Setter<Object, 
String>)afi.setterOrGetter).set(newObj,
+                  new 
String(tuple.get(afi.fieldInfo.getColumnName()).toString()));
+              break;
+            case LONG:
+              ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(newObj,
+                  (long)tuple.get(afi.fieldInfo.getColumnName()));
+              break;
+            default:
+              throw new RuntimeException("Invalid Support Type");
+          }
+        } catch (Exception e) {
+          LOG.error("Exception in setting value", e);
+          throw new RuntimeException(e);
+        }
+
+      }
+    } catch (Exception ex) {
+      LOG.error("Generic Exception in setting value" + ex.getMessage());
+      newObj = null;
+    }
+    return newObj;
+  }
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort<Object> output = new 
DefaultOutputPort<Object>()
+  {
+    public void setup(PortContext context)
+    {
+      jsonParser = new JSONParser();
+      finder = new JsonKeyFinder();
+      columnFields = new ArrayList<String>();
+      columnFieldSetters = Lists.newArrayList();
+
+      setPojoClass(context.getValue(Context.PortContext.TUPLE_CLASS));
+
+      if (getFieldMappingString() == null) {
+        
setFieldInfos(createFieldInfoMap(generateFieldInfoInputs(getPojoClass())));
+      } else {
+        setFieldInfos(createFieldInfoMap(getFieldMappingString()));
+      }
+      initColumnFieldSetters(getFieldInfos());
+      initializeActiveFieldSetters();
+
+      ListIterator<FieldInfo> itr = fieldInfos.listIterator();
+      while (itr.hasNext()) {
+        columnFields.add(itr.next().getColumnName());
+      }
+      finder.setMatchKeyList(columnFields);
+    }
+  };
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingJsonParser.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/065bf020/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/package-info.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/package-info.java 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/package-info.java
new file mode 100644
index 0000000..22c2cd4
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
[email protected]
+package org.apache.apex.malhar.contrib.parser;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/065bf020/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParserTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParserTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParserTest.java
new file mode 100644
index 0000000..3e69a02
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParserTest.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.parser;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.ListIterator;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.python.google.common.collect.Lists;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.lib.helper.TestPortContext;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+public class StreamingJsonParserTest
+{
+  public static final String fieldInfoInitMap = "id:id:INTEGER," + 
"name:name:STRING," + "gpa:gpa:DOUBLE";
+
+  public static final String nestedFieldInfoMap = "id:id:INTEGER," + 
"name:name:STRING," + "gpa:gpa:DOUBLE,"
+      + "streetAddress:streetAddress:STRING," + "city:city:STRING," + 
"state:state:STRING,"
+      + "postalCode:postalCode:STRING";
+
+  public static final String invalidFieldInfoMap = "Field1:id:INTEGER," + 
"name:name:STRING," + "gpa:gpa:DOUBLE";
+
+  private static final String FILENAME = "/tmp/streaming.json";
+
+  CollectorTestSink<Object> outputSink = new CollectorTestSink<Object>();
+  CollectorTestSink<Object> errorSink = new CollectorTestSink<Object>();
+
+  StreamingJsonParser jsonParser = new StreamingJsonParser();
+
+  private List<String> recordList = null;
+
+  public class TestMeta extends TestWatcher
+  {
+    Context.OperatorContext context;
+    Context.PortContext portContext;
+    public String dir = null;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      String methodName = description.getMethodName();
+      String className = description.getClassName();
+      this.dir = "target/" + className + "/" + methodName;
+      Attribute.AttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+      portAttributes.put(Context.PortContext.TUPLE_CLASS, Person.class);
+      portContext = new TestPortContext(portAttributes);
+      super.starting(description);
+      jsonParser.output.setup(testMeta.portContext);
+      jsonParser.output.setSink(outputSink);
+      jsonParser.err.setSink(errorSink);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      try {
+        FileUtils.deleteDirectory(new File("target/" + 
description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      jsonParser.teardown();
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testReads() throws Exception
+  {
+    int count = 5;
+    createReaderInput(count);
+    jsonParser.setFieldMappingString(fieldInfoInitMap);
+    jsonParser.setup(testMeta.context);
+    jsonParser.output.setup(testMeta.portContext);
+
+    jsonParser.beginWindow(0);
+    ListIterator<String> itr = recordList.listIterator();
+    while (itr.hasNext()) {
+      jsonParser.in.process(itr.next().getBytes());
+    }
+    jsonParser.endWindow();
+
+    Assert.assertEquals("Number of tuples", count, 
outputSink.collectedTuples.size());
+    Person obj = (Person)outputSink.collectedTuples.get(0);
+    Assert.assertEquals("Name is", "name-5", obj.getName());
+    jsonParser.teardown();
+  }
+
+  @Test
+  public void testNestedReads() throws Exception
+  {
+    int count = 4;
+    createReaderInput(count);
+    jsonParser.setFieldMappingString(nestedFieldInfoMap);
+    jsonParser.setup(testMeta.context);
+    jsonParser.output.setup(testMeta.portContext);
+
+    jsonParser.beginWindow(0);
+    ListIterator<String> itr = recordList.listIterator();
+    while (itr.hasNext()) {
+      jsonParser.in.process(itr.next().getBytes());
+    }
+    jsonParser.endWindow();
+    Person obj = (Person)outputSink.collectedTuples.get(0);
+    Assert.assertEquals("Number of tuples", count, 
outputSink.collectedTuples.size());
+    Assert.assertEquals("Name is", "name-4", obj.getName());
+    jsonParser.teardown();
+  }
+
+  @Test
+  public void testReadsWithReflection() throws Exception
+  {
+    int count = 6;
+    createReaderInput(count);
+    jsonParser.setFieldMappingString(null);
+    jsonParser.setup(testMeta.context);
+    jsonParser.output.setup(testMeta.portContext);
+
+    jsonParser.beginWindow(0);
+    ListIterator<String> itr = recordList.listIterator();
+    while (itr.hasNext()) {
+      jsonParser.in.process(itr.next().getBytes());
+    }
+    jsonParser.endWindow();
+    Person obj = (Person)outputSink.collectedTuples.get(0);
+    Assert.assertEquals("Number of tuples", count, 
outputSink.collectedTuples.size());
+    Assert.assertEquals("Name is", "name-6", obj.getName());
+    jsonParser.teardown();
+  }
+
+  @Test
+  public void testInvalidKeyMapping() throws Exception
+  {
+    int count = 6;
+    createReaderInput(count);
+    jsonParser.setFieldMappingString(invalidFieldInfoMap);
+    jsonParser.setup(testMeta.context);
+    jsonParser.output.setup(testMeta.portContext);
+
+    jsonParser.beginWindow(0);
+    ListIterator<String> itr = recordList.listIterator();
+    while (itr.hasNext()) {
+      jsonParser.in.process(itr.next().getBytes());
+    }
+    jsonParser.endWindow();
+    Person obj = (Person)outputSink.collectedTuples.get(0);
+    Assert.assertEquals("Number of tuples", count, 
outputSink.collectedTuples.size());
+    Assert.assertEquals("Id is", null, obj.getId());
+    Assert.assertEquals("Name is", "name-6", obj.getName());
+    jsonParser.teardown();
+  }
+
+  private void createReaderInput(int count)
+  {
+    String address = "\"address\":{" + "\"streetAddress\": \"21 2nd Street\"," 
+ "\"city\": \"New York\","
+        + "\"state\": \"NY\"," + "\"postalCode\": \"10021\"}";
+    recordList = Lists.newArrayList();
+    while (count > 0) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("{").append("\"id\"").append(":").append(count).append(",");
+      sb.append("\"name\":").append("\"").append("name-" + 
count).append("\"").append(",");
+      sb.append("\"Elective-0\":").append("\"").append("elective-" + count * 
1).append("\"").append(",");
+      sb.append("\"Elective-1\":").append("\"").append("elective-" + count * 
2).append("\"").append(",");
+      sb.append("\"Elective-2\":").append("\"").append("elective-" + count * 
3).append("\"").append(",");
+      sb.append("\"Elective-3\":").append("\"").append("elective-" + count * 
4).append("\"").append(",");
+      sb.append("\"gpa\":").append(count * 2.0).append(",");
+      sb.append(address).append("}");
+      count--;
+      recordList.add(sb.toString());
+    }
+  }
+
+  private void writeJsonInputFile(File file)
+  {
+    try {
+      // if file doesnt exists, then create it
+      if (!file.exists()) {
+        file.createNewFile();
+      }
+      FileWriter fw = new FileWriter(file.getAbsoluteFile());
+      BufferedWriter bw = new BufferedWriter(fw);
+      ListIterator<String> itr = recordList.listIterator();
+      while (itr.hasNext()) {
+        bw.write(itr.next().toString());
+      }
+      bw.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testApplicationWithPojoConversion() throws IOException, Exception
+  {
+    try {
+      FileContext.getLocalFSFileContext().delete(new Path(new 
File(testMeta.dir).getAbsolutePath()), true);
+      int cnt = 7;
+      createReaderInput(cnt);
+      writeJsonInputFile(new File(FILENAME));
+      FileInputOperator fileInput = new FileInputOperator();
+      fileInput.setDirectory(testMeta.dir);
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      JsonStreamingParserApp streamingParserApp = new JsonStreamingParserApp();
+      streamingParserApp.setParser(jsonParser);
+      streamingParserApp.setFileInput(fileInput);
+      lma.prepareDAG(streamingParserApp, conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.run(10000);// runs for 10 seconds and quits
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+  public static class FileInputOperator extends 
AbstractFileInputOperator<String>
+  {
+    public final transient DefaultOutputPort<byte[]> output = new 
DefaultOutputPort<byte[]>();
+
+    protected transient BufferedReader br;
+
+    @Override
+    protected InputStream openFile(Path path) throws IOException
+    {
+      InputStream is = super.openFile(path);
+      br = new BufferedReader(new InputStreamReader(is));
+      return is;
+    }
+
+    @Override
+    protected void closeFile(InputStream is) throws IOException
+    {
+      super.closeFile(is);
+      br.close();
+      br = null;
+    }
+
+    @Override
+    protected String readEntity() throws IOException
+    {
+      return br.readLine();
+    }
+
+    @Override
+    protected void emit(String tuple)
+    {
+      output.emit(tuple.getBytes());
+    }
+  }
+
+  public static class JsonStreamingParserApp implements StreamingApplication
+  {
+
+    StreamingJsonParser parser;
+    FileInputOperator fileInput;
+
+    public FileInputOperator getFileInput()
+    {
+      return fileInput;
+    }
+
+    public void setFileInput(FileInputOperator fileInput)
+    {
+      this.fileInput = fileInput;
+    }
+
+    public StreamingJsonParser getParser()
+    {
+      return parser;
+    }
+
+    public void setParser(StreamingJsonParser parser)
+    {
+      this.parser = parser;
+    }
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      FileInputOperator fileInput = dag.addOperator("fileInput", 
getFileInput());
+      StreamingJsonParser parser = dag.addOperator("parser", getParser());
+      
dag.getMeta(parser).getMeta(parser.output).getAttributes().put(Context.PortContext.TUPLE_CLASS,
 Person.class);
+      ConsoleOutputOperator consoleOutput = dag.addOperator("output", new 
ConsoleOutputOperator());
+      dag.addStream("Input", fileInput.output, 
parser.in).setLocality(Locality.CONTAINER_LOCAL);
+      dag.addStream("pojo", parser.output, 
consoleOutput.input).setLocality(Locality.CONTAINER_LOCAL);
+    }
+
+  }
+
+  public static class Person
+  {
+    private Integer id;
+    private String name;
+    private Double gpa;
+    private String streetAddress;
+    private String city;
+    private String postalCode;
+    private String state;
+
+    public Integer getId()
+    {
+      return id;
+    }
+
+    public void setId(Integer id)
+    {
+      this.id = id;
+    }
+
+    public String getName()
+    {
+      return name;
+    }
+
+    public void setName(String name)
+    {
+      this.name = name;
+    }
+
+    public Double getGpa()
+    {
+      return gpa;
+    }
+
+    public void setGpa(Double gpa)
+    {
+      this.gpa = gpa;
+    }
+
+    public String getStreetAddress()
+    {
+      return streetAddress;
+    }
+
+    public void setStreetAddress(String streetAddress)
+    {
+      this.streetAddress = streetAddress;
+    }
+
+    public String getCity()
+    {
+      return city;
+    }
+
+    public void setCity(String city)
+    {
+      this.city = city;
+    }
+
+    public String getPostalCode()
+    {
+      return postalCode;
+    }
+
+    public void setPostalCode(String postalCode)
+    {
+      this.postalCode = postalCode;
+    }
+
+    public String getState()
+    {
+      return state;
+    }
+
+    public void setState(String state)
+    {
+      this.state = state;
+    }
+  }
+}

Reply via email to