Repository: apex-malhar Updated Branches: refs/heads/master 3df6715d7 -> 42b9e2281
APEXMALHAR-2033-StreamingParser JSON streaming parser Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/065bf020 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/065bf020 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/065bf020 Branch: refs/heads/master Commit: 065bf0207476293cedc9744ef5fa00b09d2435c9 Parents: 3e1cd8f Author: devtagare <[email protected]> Authored: Tue May 24 16:52:42 2016 -0700 Committer: devtagare <[email protected]> Committed: Sun Jun 5 23:02:57 2016 -0700 ---------------------------------------------------------------------- contrib/pom.xml | 6 + .../malhar/contrib/parser/JsonKeyFinder.java | 154 +++++++ .../contrib/parser/StreamingJsonParser.java | 426 +++++++++++++++++++ .../malhar/contrib/parser/package-info.java | 20 + .../contrib/parser/StreamingJsonParserTest.java | 414 ++++++++++++++++++ 5 files changed, 1020 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/065bf020/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index c0ec6c7..cf98feb 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -641,6 +641,12 @@ </exclusions> </dependency> <dependency> + <groupId>com.googlecode.json-simple</groupId> + <artifactId>json-simple</artifactId> + <version>1.1.1</version> + <optional>true</optional> + </dependency> + <dependency> <groupId>com.github.fge</groupId> <artifactId>json-schema-validator</artifactId> <version>2.0.1</version> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/065bf020/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/JsonKeyFinder.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/JsonKeyFinder.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/JsonKeyFinder.java new file mode 100644 index 0000000..51aee27 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/JsonKeyFinder.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.parser; + +/** + * A concrete implementation of Json ContentHandler<br> + * Matches JSON keys set from the {@link StreamingJsonParser } + * */ +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; + +import org.json.simple.parser.ContentHandler; + +import com.google.protobuf.TextFormat.ParseException; + +public class JsonKeyFinder implements ContentHandler +{ + public JsonKeyFinder() + { + keyValMap = new HashMap<>(); + } + + public int getKeyCount() + { + return keyCount; + } + + public void setKeyCount(int keyCount) + { + this.keyCount = keyCount; + } + + private Object value; + private HashMap<Object, Object> keyValMap; + private int keyCount = 0; + + public HashMap<Object, Object> getKeyValMap() + { + return keyValMap; + } + + public void setKeyValMap(HashMap<Object, Object> keyValMap) + { + this.keyValMap = keyValMap; + } + + private boolean found = false; + private boolean end = false; + private String key; + + private ArrayList<String> matchKeyList; + + public void setMatchKeyList(ArrayList<String> matchKeyList) + { + this.matchKeyList = matchKeyList; + } + + public ArrayList<String> getMatchKeyList() + { + return matchKeyList; + } + + public Object getValue() + { + return value; + } + + public boolean isEnd() + { + return end; + } + + public void setFound(boolean found) + { + this.found = found; + } + + public boolean isFound() + { + return found; + } + + public void startJSON() throws ParseException, IOException + { + found = false; + end = false; + } + + public void endJSON() throws ParseException, IOException + { + end = true; + } + + public boolean primitive(Object value) throws ParseException, IOException + { + if (getMatchKeyList().contains(key)) { + found = true; + this.value = value; + keyValMap.put(key, value); + key = null; + keyCount++; + return false; + } + return true; + } + + public boolean startArray() throws ParseException, IOException + { + return true; + } + + public boolean startObject() throws ParseException, IOException + { + return true; + } + + public boolean startObjectEntry(String key) throws ParseException, IOException + { + this.key = key; + return true; + } + + public boolean endArray() throws ParseException, IOException + { + return false; + } + + public boolean endObject() throws ParseException, IOException + { + return true; + } + + public boolean endObjectEntry() throws ParseException, IOException + { + return true; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/065bf020/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java new file mode 100644 index 0000000..7cc5821 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.parser; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.StringTokenizer; + +import org.elasticsearch.common.primitives.Ints; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.parser.Parser; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.FieldInfo.SupportType; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Operator that parses a JSON string tuple and emits a POJO on the output port + * and tuples that could not be parsed on error port.Upstream operator needs to + * ensure that a full JSON record is emitted.<br> + * <b>Properties</b><br> + * <b>pojoClass</b>:POJO class <br> + * <b>(optional)fieldMappingString</b>String of format + * fieldNameInJson:fieldNameInPOJO:DataType<br> + * <b>Ports</b> <br> + * <b>in</b>:input tuple as a String. Each tuple represents a json string<br> + * <b>out</b>:tuples that are validated as per the user defined POJO are emitted + * as POJO on this port<br> + * <b>err</b>:tuples that could not be parsed are emitted on this port as + * KeyValPair<String,String><br> + * Key being the tuple and Val being the reason + * + * @displayName SimpleStreamingJsonParser + * @category Parsers + * @tags json pojo parser streaming + */ [email protected] +public class StreamingJsonParser extends Parser<byte[], KeyValPair<String, String>> +{ + private transient JSONParser jsonParser; + private transient String fieldMappingString; + private transient List<FieldInfo> fieldInfos; + private transient List<ActiveFieldInfo> columnFieldSetters; + protected JsonKeyFinder finder; + private static final String FIELD_SEPARATOR = ":"; + private static final String RECORD_SEPARATOR = ","; + private transient ArrayList<String> columnFields; + private transient Class<?> pojoClass; + + /** + * @return POJO class + */ + private Class<?> getPojoClass() + { + return pojoClass; + } + + /** + * Sets the POJO class + */ + public void setPojoClass(Class<?> pojoClass) + { + this.pojoClass = pojoClass; + } + + /** + * Returns a string representing mapping from generic record to POJO fields + */ + public String getFieldMappingString() + { + return fieldMappingString; + } + + /** + * Comma separated list mapping a field in JSON schema to POJO field eg : + * fieldNameInPOJO:fieldNameInJSON:DataType + */ + public void setFieldMappingString(String pojoFieldsToJsonMapping) + { + this.fieldMappingString = pojoFieldsToJsonMapping; + } + + public StreamingJsonParser() + { + + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + } + + @Override + public void processTuple(byte[] tuple) + { + incomingTuplesCount++; + Object obj = convert(tuple); + if (obj != null) { + output.emit(obj); + emittedObjectCount++; + } + } + + /** + * Parse an incoming tuple & return a POJO object + */ + @Override + public Object convert(byte[] tuple) + { + String str; + if (tuple == null) { + if (err.isConnected()) { + err.emit(new KeyValPair<String, String>(null, "null tuple")); + } + errorTupleCount++; + return null; + } + + try { + str = new String(tuple, "UTF-8"); + } catch (UnsupportedEncodingException e1) { + if (err.isConnected()) { + err.emit(new KeyValPair<String, String>(tuple.toString(), "Encoding not supported")); + } + errorTupleCount++; + LOG.error("Encoding not supported", e1); + throw new RuntimeException(e1); + } + + try { + finder.setKeyCount(0); + finder.getKeyValMap().clear(); + while (!finder.isEnd()) { + jsonParser.parse(str, finder, true); + //stop parsing when the required keyCount is reached + if (finder.getKeyCount() == columnFields.size()) { + break; + } + } + jsonParser.reset(); + return setPojoFields(finder.getKeyValMap()); + } catch (ParseException | IllegalAccessException | InstantiationException e) { + if (err.isConnected()) { + err.emit(new KeyValPair<String, String>(str, e.getMessage())); + } + errorTupleCount++; + LOG.error("Exception in parsing the record", e); + throw new RuntimeException(e); + } + + } + + /** + * Creates a map representing fieldName in POJO:field in JSON:Data type + * + * @return List of FieldInfo + */ + private List<FieldInfo> createFieldInfoMap(String str) + { + fieldInfos = new ArrayList<FieldInfo>(); + StringTokenizer strtok = new StringTokenizer(str, RECORD_SEPARATOR); + + while (strtok.hasMoreTokens()) { + String[] token = strtok.nextToken().split(FIELD_SEPARATOR); + try { + fieldInfos.add(new FieldInfo(token[0], token[1], SupportType.valueOf(token[2]))); + } catch (Exception e) { + LOG.error("Invalid support type", e); + } + } + return fieldInfos; + } + + @Override + public KeyValPair<String, String> processErrorTuple(byte[] input) + { + throw new UnsupportedOperationException("Not supported"); + } + + /** + * Class that maps fieldInfo to its getters or setters + */ + protected static class ActiveFieldInfo + { + final FieldInfo fieldInfo; + Object setterOrGetter; + + ActiveFieldInfo(FieldInfo fieldInfo) + { + this.fieldInfo = fieldInfo; + } + + } + + /** + * A list of {@link FieldInfo}s where each item maps a column name to a pojo + * field name. + */ + private List<FieldInfo> getFieldInfos() + { + return fieldInfos; + } + + /** + * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a + * pojo field name.<br/> + * The value from fieldInfo.column is assigned to + * fieldInfo.pojoFieldExpression. + * + * @description $[].columnName name of the Output Field in POJO + * @description $[].pojoFieldExpression expression to get the respective field + * from generic record + * @useSchema $[].pojoFieldExpression outputPort.fields[].name + */ + private void setFieldInfos(List<FieldInfo> fieldInfos) + { + this.fieldInfos = fieldInfos; + } + + /** + * Use reflection to generate field info values if the user has not provided + * the inputs mapping + * + * @return String representing the POJO field to JSON field mapping + */ + private String generateFieldInfoInputs(Class<?> cls) + { + java.lang.reflect.Field[] fields = cls.getDeclaredFields(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < fields.length; i++) { + java.lang.reflect.Field f = fields[i]; + Class<?> c = ClassUtils.primitiveToWrapper(f.getType()); + sb.append(f.getName()).append(FIELD_SEPARATOR).append(f.getName()).append(FIELD_SEPARATOR) + .append(c.getSimpleName().toUpperCase()).append(RECORD_SEPARATOR); + } + return sb.substring(0, sb.length() - 1); + } + + /** + * Adds the Active Fields to the columnFieldSetters {@link ActiveFieldInfo}s + */ + private void initColumnFieldSetters(List<FieldInfo> fieldInfos) + { + for (FieldInfo fi : fieldInfos) { + if (columnFieldSetters == null) { + columnFieldSetters = Lists.newArrayList(); + } + columnFieldSetters.add(new StreamingJsonParser.ActiveFieldInfo(fi)); + } + } + + /** + * Initialize the setters for generating the POJO + */ + private void initializeActiveFieldSetters() + { + for (int i = 0; i < columnFieldSetters.size(); i++) { + ActiveFieldInfo activeFieldInfo = columnFieldSetters.get(i); + + SupportType st = activeFieldInfo.fieldInfo.getType(); + + switch (st) { + case BOOLEAN: + activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(getPojoClass(), + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + case DOUBLE: + activeFieldInfo.setterOrGetter = PojoUtils.createSetterDouble(getPojoClass(), + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + case FLOAT: + activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(getPojoClass(), + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + case INTEGER: + activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(getPojoClass(), + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + case STRING: + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(getPojoClass(), + activeFieldInfo.fieldInfo.getPojoFieldExpression(), activeFieldInfo.fieldInfo.getType().getJavaType()); + break; + case LONG: + activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(getPojoClass(), + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + default: + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(getPojoClass(), + activeFieldInfo.fieldInfo.getPojoFieldExpression(), Byte.class); + break; + } + columnFieldSetters.get(i).setterOrGetter = activeFieldInfo.setterOrGetter; + } + } + + /** + * Returns a POJO from a Generic Record Null is set as the default value if a + * key is not found in the parsed JSON + * + * @return Object + */ + @SuppressWarnings("unchecked") + private Object setPojoFields(HashMap<Object, Object> tuple) throws InstantiationException, IllegalAccessException + { + Object newObj = getPojoClass().newInstance(); + try { + for (int i = 0; i < columnFieldSetters.size(); i++) { + + StreamingJsonParser.ActiveFieldInfo afi = columnFieldSetters.get(i); + SupportType st = afi.fieldInfo.getType(); + Object val = null; + + try { + val = tuple.get(afi.fieldInfo.getColumnName()); + } catch (Exception e) { + LOG.error("Could not find field -" + afi.fieldInfo.getColumnName() + "- in the generic record", e); + val = null; + } + + //Nothing to set if a value is null + if (val == null) { + continue; + } + + try { + switch (st) { + case BOOLEAN: + ((PojoUtils.SetterBoolean<Object>)afi.setterOrGetter).set(newObj, + (boolean)tuple.get(afi.fieldInfo.getColumnName())); + break; + case DOUBLE: + ((PojoUtils.SetterDouble<Object>)afi.setterOrGetter).set(newObj, + (double)tuple.get(afi.fieldInfo.getColumnName())); + break; + case INTEGER: + int intVal = Ints.checkedCast((long)tuple.get(afi.fieldInfo.getColumnName())); + ((PojoUtils.SetterInt<Object>)afi.setterOrGetter).set(newObj, intVal); + break; + case STRING: + ((PojoUtils.Setter<Object, String>)afi.setterOrGetter).set(newObj, + new String(tuple.get(afi.fieldInfo.getColumnName()).toString())); + break; + case LONG: + ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(newObj, + (long)tuple.get(afi.fieldInfo.getColumnName())); + break; + default: + throw new RuntimeException("Invalid Support Type"); + } + } catch (Exception e) { + LOG.error("Exception in setting value", e); + throw new RuntimeException(e); + } + + } + } catch (Exception ex) { + LOG.error("Generic Exception in setting value" + ex.getMessage()); + newObj = null; + } + return newObj; + } + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>() + { + public void setup(PortContext context) + { + jsonParser = new JSONParser(); + finder = new JsonKeyFinder(); + columnFields = new ArrayList<String>(); + columnFieldSetters = Lists.newArrayList(); + + setPojoClass(context.getValue(Context.PortContext.TUPLE_CLASS)); + + if (getFieldMappingString() == null) { + setFieldInfos(createFieldInfoMap(generateFieldInfoInputs(getPojoClass()))); + } else { + setFieldInfos(createFieldInfoMap(getFieldMappingString())); + } + initColumnFieldSetters(getFieldInfos()); + initializeActiveFieldSetters(); + + ListIterator<FieldInfo> itr = fieldInfos.listIterator(); + while (itr.hasNext()) { + columnFields.add(itr.next().getColumnName()); + } + finder.setMatchKeyList(columnFields); + } + }; + + private static final Logger LOG = LoggerFactory.getLogger(StreamingJsonParser.class); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/065bf020/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/package-info.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/package-info.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/package-info.java new file mode 100644 index 0000000..22c2cd4 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ [email protected] +package org.apache.apex.malhar.contrib.parser; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/065bf020/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParserTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParserTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParserTest.java new file mode 100644 index 0000000..3e69a02 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParserTest.java @@ -0,0 +1,414 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.parser; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.List; +import java.util.ListIterator; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.python.google.common.collect.Lists; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.lib.helper.TestPortContext; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.io.fs.AbstractFileInputOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; + +public class StreamingJsonParserTest +{ + public static final String fieldInfoInitMap = "id:id:INTEGER," + "name:name:STRING," + "gpa:gpa:DOUBLE"; + + public static final String nestedFieldInfoMap = "id:id:INTEGER," + "name:name:STRING," + "gpa:gpa:DOUBLE," + + "streetAddress:streetAddress:STRING," + "city:city:STRING," + "state:state:STRING," + + "postalCode:postalCode:STRING"; + + public static final String invalidFieldInfoMap = "Field1:id:INTEGER," + "name:name:STRING," + "gpa:gpa:DOUBLE"; + + private static final String FILENAME = "/tmp/streaming.json"; + + CollectorTestSink<Object> outputSink = new CollectorTestSink<Object>(); + CollectorTestSink<Object> errorSink = new CollectorTestSink<Object>(); + + StreamingJsonParser jsonParser = new StreamingJsonParser(); + + private List<String> recordList = null; + + public class TestMeta extends TestWatcher + { + Context.OperatorContext context; + Context.PortContext portContext; + public String dir = null; + + @Override + protected void starting(org.junit.runner.Description description) + { + String methodName = description.getMethodName(); + String className = description.getClassName(); + this.dir = "target/" + className + "/" + methodName; + Attribute.AttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, Person.class); + portContext = new TestPortContext(portAttributes); + super.starting(description); + jsonParser.output.setup(testMeta.portContext); + jsonParser.output.setSink(outputSink); + jsonParser.err.setSink(errorSink); + } + + @Override + protected void finished(Description description) + { + try { + FileUtils.deleteDirectory(new File("target/" + description.getClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + jsonParser.teardown(); + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testReads() throws Exception + { + int count = 5; + createReaderInput(count); + jsonParser.setFieldMappingString(fieldInfoInitMap); + jsonParser.setup(testMeta.context); + jsonParser.output.setup(testMeta.portContext); + + jsonParser.beginWindow(0); + ListIterator<String> itr = recordList.listIterator(); + while (itr.hasNext()) { + jsonParser.in.process(itr.next().getBytes()); + } + jsonParser.endWindow(); + + Assert.assertEquals("Number of tuples", count, outputSink.collectedTuples.size()); + Person obj = (Person)outputSink.collectedTuples.get(0); + Assert.assertEquals("Name is", "name-5", obj.getName()); + jsonParser.teardown(); + } + + @Test + public void testNestedReads() throws Exception + { + int count = 4; + createReaderInput(count); + jsonParser.setFieldMappingString(nestedFieldInfoMap); + jsonParser.setup(testMeta.context); + jsonParser.output.setup(testMeta.portContext); + + jsonParser.beginWindow(0); + ListIterator<String> itr = recordList.listIterator(); + while (itr.hasNext()) { + jsonParser.in.process(itr.next().getBytes()); + } + jsonParser.endWindow(); + Person obj = (Person)outputSink.collectedTuples.get(0); + Assert.assertEquals("Number of tuples", count, outputSink.collectedTuples.size()); + Assert.assertEquals("Name is", "name-4", obj.getName()); + jsonParser.teardown(); + } + + @Test + public void testReadsWithReflection() throws Exception + { + int count = 6; + createReaderInput(count); + jsonParser.setFieldMappingString(null); + jsonParser.setup(testMeta.context); + jsonParser.output.setup(testMeta.portContext); + + jsonParser.beginWindow(0); + ListIterator<String> itr = recordList.listIterator(); + while (itr.hasNext()) { + jsonParser.in.process(itr.next().getBytes()); + } + jsonParser.endWindow(); + Person obj = (Person)outputSink.collectedTuples.get(0); + Assert.assertEquals("Number of tuples", count, outputSink.collectedTuples.size()); + Assert.assertEquals("Name is", "name-6", obj.getName()); + jsonParser.teardown(); + } + + @Test + public void testInvalidKeyMapping() throws Exception + { + int count = 6; + createReaderInput(count); + jsonParser.setFieldMappingString(invalidFieldInfoMap); + jsonParser.setup(testMeta.context); + jsonParser.output.setup(testMeta.portContext); + + jsonParser.beginWindow(0); + ListIterator<String> itr = recordList.listIterator(); + while (itr.hasNext()) { + jsonParser.in.process(itr.next().getBytes()); + } + jsonParser.endWindow(); + Person obj = (Person)outputSink.collectedTuples.get(0); + Assert.assertEquals("Number of tuples", count, outputSink.collectedTuples.size()); + Assert.assertEquals("Id is", null, obj.getId()); + Assert.assertEquals("Name is", "name-6", obj.getName()); + jsonParser.teardown(); + } + + private void createReaderInput(int count) + { + String address = "\"address\":{" + "\"streetAddress\": \"21 2nd Street\"," + "\"city\": \"New York\"," + + "\"state\": \"NY\"," + "\"postalCode\": \"10021\"}"; + recordList = Lists.newArrayList(); + while (count > 0) { + StringBuilder sb = new StringBuilder(); + sb.append("{").append("\"id\"").append(":").append(count).append(","); + sb.append("\"name\":").append("\"").append("name-" + count).append("\"").append(","); + sb.append("\"Elective-0\":").append("\"").append("elective-" + count * 1).append("\"").append(","); + sb.append("\"Elective-1\":").append("\"").append("elective-" + count * 2).append("\"").append(","); + sb.append("\"Elective-2\":").append("\"").append("elective-" + count * 3).append("\"").append(","); + sb.append("\"Elective-3\":").append("\"").append("elective-" + count * 4).append("\"").append(","); + sb.append("\"gpa\":").append(count * 2.0).append(","); + sb.append(address).append("}"); + count--; + recordList.add(sb.toString()); + } + } + + private void writeJsonInputFile(File file) + { + try { + // if file doesnt exists, then create it + if (!file.exists()) { + file.createNewFile(); + } + FileWriter fw = new FileWriter(file.getAbsoluteFile()); + BufferedWriter bw = new BufferedWriter(fw); + ListIterator<String> itr = recordList.listIterator(); + while (itr.hasNext()) { + bw.write(itr.next().toString()); + } + bw.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Test + public void testApplicationWithPojoConversion() throws IOException, Exception + { + try { + FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true); + int cnt = 7; + createReaderInput(cnt); + writeJsonInputFile(new File(FILENAME)); + FileInputOperator fileInput = new FileInputOperator(); + fileInput.setDirectory(testMeta.dir); + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + JsonStreamingParserApp streamingParserApp = new JsonStreamingParserApp(); + streamingParserApp.setParser(jsonParser); + streamingParserApp.setFileInput(fileInput); + lma.prepareDAG(streamingParserApp, conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000);// runs for 10 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + public static class FileInputOperator extends AbstractFileInputOperator<String> + { + public final transient DefaultOutputPort<byte[]> output = new DefaultOutputPort<byte[]>(); + + protected transient BufferedReader br; + + @Override + protected InputStream openFile(Path path) throws IOException + { + InputStream is = super.openFile(path); + br = new BufferedReader(new InputStreamReader(is)); + return is; + } + + @Override + protected void closeFile(InputStream is) throws IOException + { + super.closeFile(is); + br.close(); + br = null; + } + + @Override + protected String readEntity() throws IOException + { + return br.readLine(); + } + + @Override + protected void emit(String tuple) + { + output.emit(tuple.getBytes()); + } + } + + public static class JsonStreamingParserApp implements StreamingApplication + { + + StreamingJsonParser parser; + FileInputOperator fileInput; + + public FileInputOperator getFileInput() + { + return fileInput; + } + + public void setFileInput(FileInputOperator fileInput) + { + this.fileInput = fileInput; + } + + public StreamingJsonParser getParser() + { + return parser; + } + + public void setParser(StreamingJsonParser parser) + { + this.parser = parser; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + FileInputOperator fileInput = dag.addOperator("fileInput", getFileInput()); + StreamingJsonParser parser = dag.addOperator("parser", getParser()); + dag.getMeta(parser).getMeta(parser.output).getAttributes().put(Context.PortContext.TUPLE_CLASS, Person.class); + ConsoleOutputOperator consoleOutput = dag.addOperator("output", new ConsoleOutputOperator()); + dag.addStream("Input", fileInput.output, parser.in).setLocality(Locality.CONTAINER_LOCAL); + dag.addStream("pojo", parser.output, consoleOutput.input).setLocality(Locality.CONTAINER_LOCAL); + } + + } + + public static class Person + { + private Integer id; + private String name; + private Double gpa; + private String streetAddress; + private String city; + private String postalCode; + private String state; + + public Integer getId() + { + return id; + } + + public void setId(Integer id) + { + this.id = id; + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public Double getGpa() + { + return gpa; + } + + public void setGpa(Double gpa) + { + this.gpa = gpa; + } + + public String getStreetAddress() + { + return streetAddress; + } + + public void setStreetAddress(String streetAddress) + { + this.streetAddress = streetAddress; + } + + public String getCity() + { + return city; + } + + public void setCity(String city) + { + this.city = city; + } + + public String getPostalCode() + { + return postalCode; + } + + public void setPostalCode(String postalCode) + { + this.postalCode = postalCode; + } + + public String getState() + { + return state; + } + + public void setState(String state) + { + this.state = state; + } + } +}
