Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 cec33da88 -> 0a3e00ce0
APEXMALHAR-1962 json parser enhancements Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0a3e00ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0a3e00ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0a3e00ce Branch: refs/heads/devel-3 Commit: 0a3e00ce0b804ddcb9a2f7bb45dc8698dfd05d84 Parents: cec33da Author: shubham <[email protected]> Authored: Mon Jan 4 12:43:01 2016 +0530 Committer: shubham <[email protected]> Committed: Tue Feb 16 10:53:38 2016 +0530 ---------------------------------------------------------------------- contrib/pom.xml | 6 + .../datatorrent/contrib/parser/JsonParser.java | 247 +++++++++++ .../parser/JsonParserApplicationTest.java | 93 ++++ .../contrib/parser/JsonParserTest.java | 443 +++++++++++++++++++ .../src/test/resources/json-parser-schema.json | 51 +++ .../com/datatorrent/lib/parser/JsonParser.java | 110 ----- .../datatorrent/lib/parser/JsonParserTest.java | 228 ---------- 7 files changed, 840 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index 6145fac..b994928 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -622,6 +622,12 @@ <artifactId>gemfire-core</artifactId> <version>1.0.0-incubating.M1</version> <optional>true</optional> + </dependency> + <dependency> + <groupId>com.github.fge</groupId> + <artifactId>json-schema-validator</artifactId> + <version>2.0.1</version> + <optional>true</optional> </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java new file mode 100644 index 0000000..b6c3c4d --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.parser; + +import java.io.IOException; +import java.util.Iterator; + +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.fge.jsonschema.exceptions.ProcessingException; +import com.github.fge.jsonschema.main.JsonSchema; +import com.github.fge.jsonschema.main.JsonSchemaFactory; +import com.github.fge.jsonschema.report.ProcessingMessage; +import com.github.fge.jsonschema.report.ProcessingReport; +import com.github.fge.jsonschema.util.JsonLoader; +import com.google.common.annotations.VisibleForTesting; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.parser.Parser; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Operator that parses a json string tuple against a specified json schema and + * emits JSONObject on one port, POJO on other port and tuples that could not be + * parsed on error port.<br> + * Schema is specified in a json format as per http://json-schema.org/ <br> + * Example for the schema can be seen here http://json-schema.org/example1.html <br> + * User can choose to skip validations by not specifying the schema at all. <br> + * <br> + * <b>Properties</b><br> + * <b>jsonSchema</b>:schema as a string<br> + * <b>clazz</b>:Pojo class <br> + * <b>Ports</b> <br> + * <b>in</b>:input tuple as a String. Each tuple represents a json string<br> + * <b>parsedOutput</b>:tuples that are validated against the schema are emitted + * as JSONObject on this port<br> + * <b>out</b>:tuples that are validated against the schema are emitted as pojo + * on this port<br> + * <b>err</b>:tuples that do not confine to schema are emitted on this port as + * KeyValPair<String,String><br> + * Key being the tuple and Val being the reason. + * + * + * @displayName JsonParser + * @category Parsers + * @tags json pojo parser + * @since 3.2.0 + */ [email protected] +public class JsonParser extends Parser<byte[], KeyValPair<String, String>> +{ + + /** + * Contents of the schema.Schema is specified as per http://json-schema.org/ + */ + private String jsonSchema; + private transient JsonSchema schema; + private transient ObjectMapper objMapper; + /** + * output port to emit validate records as JSONObject + */ + public transient DefaultOutputPort<JSONObject> parsedOutput = new DefaultOutputPort<JSONObject>(); + /** + * metric to keep count of number of tuples emitted on {@link #parsedOutput} + * port + */ + @AutoMetric + long parsedOutputCount; + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + parsedOutputCount = 0; + } + + @Override + public void setup(OperatorContext context) + { + try { + if (jsonSchema != null) { + JsonSchemaFactory factory = JsonSchemaFactory.byDefault(); + JsonNode schemaNode = JsonLoader.fromString(jsonSchema); + schema = factory.getJsonSchema(schemaNode); + } + objMapper = new ObjectMapper(); + objMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + } catch (ProcessingException | IOException e) { + DTThrowable.wrapIfChecked(e); + } + } + + @Override + public void processTuple(byte[] tuple) + { + if (tuple == null) { + if (err.isConnected()) { + err.emit(new KeyValPair<String, String>(null, "null tuple")); + } + errorTupleCount++; + return; + } + String incomingString = new String(tuple); + try { + if (schema != null) { + ProcessingReport report = null; + JsonNode data = JsonLoader.fromString(incomingString); + report = schema.validate(data); + if (report != null && !report.isSuccess()) { + Iterator<ProcessingMessage> iter = report.iterator(); + StringBuilder s = new StringBuilder(); + while (iter.hasNext()) { + ProcessingMessage pm = iter.next(); + s.append(pm.asJson().get("instance").findValue("pointer")).append(":").append(pm.asJson().get("message")) + .append(","); + } + s.setLength(s.length() - 1); + errorTupleCount++; + if (err.isConnected()) { + err.emit(new KeyValPair<String, String>(incomingString, s.toString())); + } + return; + } + } + if (parsedOutput.isConnected()) { + parsedOutput.emit(new JSONObject(incomingString)); + parsedOutputCount++; + } + if (out.isConnected()) { + out.emit(objMapper.readValue(tuple, clazz)); + emittedObjectCount++; + } + } catch (JSONException | ProcessingException | IOException e) { + errorTupleCount++; + if (err.isConnected()) { + err.emit(new KeyValPair<String, String>(incomingString, e.getMessage())); + } + logger.error("Failed to parse json tuple {}, Exception = {} ", tuple, e); + } + } + + @Override + public Object convert(byte[] tuple) + { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public KeyValPair<String, String> processErrorTuple(byte[] input) + { + throw new UnsupportedOperationException("Not supported"); + } + + /** + * Get jsonSchema contents as a string to be used during validation + * + * @return jsonSchema + */ + public String getJsonSchema() + { + return jsonSchema; + } + + /** + * Sets jsonSchema to be used during validation + * + * @param jsonSchema + * schema as a string + */ + public void setJsonSchema(String jsonSchema) + { + this.jsonSchema = jsonSchema; + } + + /** + * Get errorTupleCount + * + * @return errorTupleCount + */ + @VisibleForTesting + public long getErrorTupleCount() + { + return errorTupleCount; + } + + /** + * Get emittedObjectCount + * + * @return emittedObjectCount + */ + @VisibleForTesting + public long getEmittedObjectCount() + { + return emittedObjectCount; + } + + /** + * Get incomingTuplesCount + * + * @return incomingTuplesCount + */ + @VisibleForTesting + public long getIncomingTuplesCount() + { + return incomingTuplesCount; + } + + /** + * Set schema. + * + * @param schema + */ + @VisibleForTesting + public void setSchema(JsonSchema schema) + { + this.schema = schema; + } + + private static final Logger logger = LoggerFactory.getLogger(JsonParser.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserApplicationTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserApplicationTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserApplicationTest.java new file mode 100644 index 0000000..d1b1efa --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserApplicationTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.parser; + +import java.io.IOException; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.contrib.parser.JsonParserTest.Product; +import com.datatorrent.lib.appdata.schemas.SchemaUtils; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +public class JsonParserApplicationTest +{ + + @Test + public void testApplication() throws IOException, Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new JsonParserTest(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000);// runs for 10 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + public static class JsonParserTest implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + JsonDataEmitterOperator input = dag.addOperator("data", new JsonDataEmitterOperator()); + JsonParser parser = dag.addOperator("jsonparser", new JsonParser()); + parser.setClazz(Product.class); + dag.getMeta(parser).getMeta(parser.out).getAttributes().put(Context.PortContext.TUPLE_CLASS, Product.class); + parser.setJsonSchema(SchemaUtils.jarResourceFileToString("json-parser-schema.json")); + ConsoleOutputOperator jsonObjectOp = dag.addOperator("jsonObjectOp", new ConsoleOutputOperator()); + ConsoleOutputOperator pojoOp = dag.addOperator("pojoOp", new ConsoleOutputOperator()); + jsonObjectOp.setDebug(true); + dag.addStream("input", input.output, parser.in); + dag.addStream("output", parser.parsedOutput, jsonObjectOp.input); + dag.addStream("pojo", parser.out, pojoOp.input); + } + } + + public static class JsonDataEmitterOperator extends BaseOperator implements InputOperator + { + public static String jsonSample = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1," + + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0," + + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4" + + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}"; + + public final transient DefaultOutputPort<byte[]> output = new DefaultOutputPort<byte[]>(); + + @Override + public void emitTuples() + { + output.emit(jsonSample.getBytes()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserTest.java new file mode 100644 index 0000000..f492597 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserTest.java @@ -0,0 +1,443 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.parser; + +import java.util.Date; +import java.util.List; +import java.util.Map; + +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.fasterxml.jackson.annotation.JsonFormat; + +import com.datatorrent.lib.appdata.schemas.SchemaUtils; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.KeyValPair; + +public class JsonParserTest +{ + private static final String filename = "json-parser-schema.json"; + CollectorTestSink<Object> error = new CollectorTestSink<Object>(); + CollectorTestSink<Object> objectPort = new CollectorTestSink<Object>(); + CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>(); + JsonParser parser = new JsonParser(); + + @Rule + public Watcher watcher = new Watcher(); + + public class Watcher extends TestWatcher + { + @Override + protected void starting(Description description) + { + super.starting(description); + parser.err.setSink(error); + parser.parsedOutput.setSink(objectPort); + parser.out.setSink(pojoPort); + parser.setClazz(Product.class); + parser.setJsonSchema(SchemaUtils.jarResourceFileToString(filename)); + parser.setup(null); + } + + @Override + protected void finished(Description description) + { + super.finished(description); + error.clear(); + objectPort.clear(); + pojoPort.clear(); + parser.teardown(); + } + } + + @Test + public void testValidInput() throws JSONException + { + String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1," + + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0," + + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4" + + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}"; + parser.beginWindow(0); + parser.in.process(tuple.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, objectPort.collectedTuples.size()); + Assert.assertEquals(1, pojoPort.collectedTuples.size()); + Assert.assertEquals(0, error.collectedTuples.size()); + Object obj = pojoPort.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(Product.class, obj.getClass()); + Product pojo = (Product)obj; + JSONObject jsonObject = (JSONObject)objectPort.collectedTuples.get(0); + Assert.assertEquals(2, jsonObject.getInt("id")); + Assert.assertEquals(1, jsonObject.getInt("price")); + Assert.assertEquals("An ice sculpture", jsonObject.get("name")); + Assert.assertEquals(7.0, jsonObject.getJSONObject("dimensions").getDouble("length"), 0); + Assert.assertEquals(2, pojo.getId()); + Assert.assertEquals(1, pojo.getPrice()); + Assert.assertEquals("An ice sculpture", pojo.getName()); + Assert.assertEquals(7.0, (double)pojo.getDimensions().get("length"), 0); + } + + @Test + public void testEmptyInput() throws JSONException + { + parser.setSchema(null); + String tuple = ""; + parser.beginWindow(0); + parser.in.process(tuple.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + } + + @Test + public void testValidInputWithoutSchema() throws JSONException + { + parser.setSchema(null); + String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1," + + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0," + + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4" + + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}"; + parser.beginWindow(0); + parser.in.process(tuple.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, objectPort.collectedTuples.size()); + Assert.assertEquals(1, pojoPort.collectedTuples.size()); + Assert.assertEquals(0, error.collectedTuples.size()); + Object obj = pojoPort.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(Product.class, obj.getClass()); + Product pojo = (Product)obj; + JSONObject jsonObject = (JSONObject)objectPort.collectedTuples.get(0); + Assert.assertEquals(2, jsonObject.getInt("id")); + Assert.assertEquals(1, jsonObject.getInt("price")); + Assert.assertEquals("An ice sculpture", jsonObject.get("name")); + Assert.assertEquals(7.0, jsonObject.getJSONObject("dimensions").getDouble("length"), 0); + Assert.assertEquals(2, pojo.getId()); + Assert.assertEquals(1, pojo.getPrice()); + Assert.assertEquals("An ice sculpture", pojo.getName()); + Assert.assertEquals(7.0, (double)pojo.getDimensions().get("length"), 0); + } + + @Test + public void testUnknowFieldsInData() throws JSONException + { + parser.setSchema(null); + String tuple = "{" + "\"id\": 2," + "\"id2\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1," + + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0," + + "\"height\": 9.5" + "}" + "}"; + parser.beginWindow(0); + parser.in.process(tuple.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, objectPort.collectedTuples.size()); + Assert.assertEquals(1, pojoPort.collectedTuples.size()); + Assert.assertEquals(0, error.collectedTuples.size()); + Object obj = pojoPort.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(Product.class, obj.getClass()); + Product pojo = (Product)obj; + JSONObject jsonObject = (JSONObject)objectPort.collectedTuples.get(0); + Assert.assertEquals(2, jsonObject.getInt("id")); + Assert.assertEquals(1, jsonObject.getInt("price")); + Assert.assertEquals("An ice sculpture", jsonObject.get("name")); + Assert.assertEquals(7.0, jsonObject.getJSONObject("dimensions").getDouble("length"), 0); + Assert.assertEquals(2, pojo.getId()); + Assert.assertEquals(1, pojo.getPrice()); + Assert.assertEquals("An ice sculpture", pojo.getName()); + Assert.assertEquals(7.0, (double)pojo.getDimensions().get("length"), 0); + Assert.assertNull(pojo.getWarehouseLocation()); + } + + @Test + public void testInvalidPrice() throws JSONException + { + String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": -1," + + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0," + + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4" + + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}"; + parser.beginWindow(0); + parser.in.process(tuple.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + KeyValPair<String, String> errorKeyValPair = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertEquals(tuple, errorKeyValPair.getKey()); + Assert.assertEquals("\"/price\":\"number is lower than the required minimum\"", errorKeyValPair.getValue()); + } + + @Test + public void testMultipleViolations() throws JSONException + { + String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": -1," + + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"width\" : 8.0," + "\"height\": 9.5" + "}," + + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4" + "}," + + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}"; + parser.beginWindow(0); + parser.in.process(tuple.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + KeyValPair<String, String> errorKeyValPair = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertEquals(tuple, errorKeyValPair.getKey()); + Assert.assertEquals( + "\"/dimensions\":\"missing required property(ies)\",\"/price\":\"number is lower than the required minimum\"", + errorKeyValPair.getValue()); + } + + @Test + public void testJsonSyntaxError() throws JSONException + { + String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": -1," + + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"width\" : 8.0," + "\"height\": 9.5" + "}" + + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4" + "}," + + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}"; + parser.beginWindow(0); + parser.in.process(tuple.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(1, error.collectedTuples.size()); + KeyValPair<String, String> errorKeyValPair = (KeyValPair<String, String>)error.collectedTuples.get(0); + Assert.assertEquals(tuple, errorKeyValPair.getKey()); + } + + @Test + public void testValidInputPojoPortNotConnected() throws JSONException + { + parser.out.setSink(null); + String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1," + + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0," + + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4" + + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}"; + parser.beginWindow(0); + parser.in.process(tuple.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, objectPort.collectedTuples.size()); + Assert.assertEquals(0, pojoPort.collectedTuples.size()); + Assert.assertEquals(0, error.collectedTuples.size()); + JSONObject jsonObject = (JSONObject)objectPort.collectedTuples.get(0); + Assert.assertEquals(2, jsonObject.getInt("id")); + Assert.assertEquals(1, jsonObject.getInt("price")); + Assert.assertEquals("An ice sculpture", jsonObject.get("name")); + Assert.assertEquals(7.0, jsonObject.getJSONObject("dimensions").getDouble("length"), 0); + } + + @Test + public void testValidInputParsedOutputPortNotConnected() throws JSONException + { + parser.parsedOutput.setSink(null); + String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1," + + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0," + + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4" + + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}"; + parser.beginWindow(0); + parser.in.process(tuple.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, objectPort.collectedTuples.size()); + Assert.assertEquals(1, pojoPort.collectedTuples.size()); + Assert.assertEquals(0, error.collectedTuples.size()); + Object obj = pojoPort.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(Product.class, obj.getClass()); + Product pojo = (Product)obj; + Assert.assertEquals(2, pojo.getId()); + Assert.assertEquals(1, pojo.getPrice()); + Assert.assertEquals("An ice sculpture", pojo.getName()); + Assert.assertEquals(7.0, (double)pojo.getDimensions().get("length"), 0); + } + + @Test + public void testParserValidInputMetricVerification() + { + parser.beginWindow(0); + Assert.assertEquals(0, parser.parsedOutputCount); + Assert.assertEquals(0, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(0, parser.getEmittedObjectCount()); + String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1," + + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0," + + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4" + + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}"; + parser.in.process(tuple.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, parser.parsedOutputCount); + Assert.assertEquals(1, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(1, parser.getEmittedObjectCount()); + } + + @Test + public void testParserInvalidInputMetricVerification() + { + parser.beginWindow(0); + Assert.assertEquals(0, parser.parsedOutputCount); + Assert.assertEquals(0, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(0, parser.getEmittedObjectCount()); + String tuple = "{" + "\"id\": 2" + "}"; + parser.in.process(tuple.getBytes()); + parser.endWindow(); + Assert.assertEquals(0, parser.parsedOutputCount); + Assert.assertEquals(1, parser.getIncomingTuplesCount()); + Assert.assertEquals(1, parser.getErrorTupleCount()); + Assert.assertEquals(0, parser.getEmittedObjectCount()); + } + + @Test + public void testParserMetricResetVerification() + { + Assert.assertEquals(0, parser.parsedOutputCount); + Assert.assertEquals(0, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(0, parser.getEmittedObjectCount()); + String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1," + + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0," + + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4" + + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}"; + parser.beginWindow(0); + parser.in.process(tuple.getBytes()); + parser.endWindow(); + Assert.assertEquals(1, parser.parsedOutputCount); + Assert.assertEquals(1, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(1, parser.getEmittedObjectCount()); + parser.beginWindow(1); + Assert.assertEquals(0, parser.parsedOutputCount); + Assert.assertEquals(0, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(0, parser.getEmittedObjectCount()); + parser.in.process(tuple.getBytes()); + Assert.assertEquals(1, parser.parsedOutputCount); + Assert.assertEquals(1, parser.getIncomingTuplesCount()); + Assert.assertEquals(0, parser.getErrorTupleCount()); + Assert.assertEquals(1, parser.getEmittedObjectCount()); + parser.endWindow(); + } + + public static class Product + { + public int id; + public int price; + public String name; + public List<String> tags; + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy/MM/dd") + public Date dateOfManufacture; + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy") + public Date dateOfExpiry; + public Map<String, Object> dimensions; + public Map<String, Object> warehouseLocation; + + public int getId() + { + return id; + } + + public void setId(int id) + { + this.id = id; + } + + public int getPrice() + { + return price; + } + + public void setPrice(int price) + { + this.price = price; + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public List<String> getTags() + { + return tags; + } + + public void setTags(List<String> tags) + { + this.tags = tags; + } + + public Date getDateOfManufacture() + { + return dateOfManufacture; + } + + public void setDateOfManufacture(Date dateOfManufacture) + { + this.dateOfManufacture = dateOfManufacture; + } + + public Map<String, Object> getDimensions() + { + return dimensions; + } + + public void setDimensions(Map<String, Object> dimensions) + { + this.dimensions = dimensions; + } + + public Map<String, Object> getWarehouseLocation() + { + return warehouseLocation; + } + + public void setWarehouseLocation(Map<String, Object> warehouseLocation) + { + this.warehouseLocation = warehouseLocation; + } + + public Date getDateOfExpiry() + { + return dateOfExpiry; + } + + public void setDateOfExpiry(Date dateOfExpiry) + { + this.dateOfExpiry = dateOfExpiry; + } + + @Override + public String toString() + { + return "Product [id=" + id + ", price=" + price + ", name=" + name + ", tags=" + tags + ", dateOfManufacture=" + + dateOfManufacture + ", dateOfExpiry=" + dateOfExpiry + ", dimensions=" + dimensions + + ", warehouseLocation=" + warehouseLocation + "]"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/contrib/src/test/resources/json-parser-schema.json ---------------------------------------------------------------------- diff --git a/contrib/src/test/resources/json-parser-schema.json b/contrib/src/test/resources/json-parser-schema.json new file mode 100644 index 0000000..a7a2e6c --- /dev/null +++ b/contrib/src/test/resources/json-parser-schema.json @@ -0,0 +1,51 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "title": "Product set", + "type": "object", + "properties": { + "id": { + "description": "The unique identifier for a product", + "type": "number" + }, + "name": { + "type": "string" + }, + "price": { + "type": "number", + "minimum": 0, + "exclusiveMinimum": true + }, + "tags": { + "type": "array", + "items": { + "type": "string" + }, + "minItems": 1, + "uniqueItems": true + }, + "dimensions": { + "type": "object", + "properties": { + "length": {"type": "number"}, + "width": {"type": "number"}, + "height": {"type": "number"} + }, + "required": ["length", "width", "height"] + }, + "warehouseLocation": { + "description": "Coordinates of the warehouse with the product", + "$ref": "http://json-schema.org/geo" + }, + "dateOfManufacture": { + "description": "manufacturing date", + "type": "string" + + }, + "dateOfExpiry": { + "description": "expiry date", + "type": "string" + + } + }, + "required": ["id", "name", "price"] +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java deleted file mode 100644 index 4e9800a..0000000 --- a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.parser; - -import java.io.IOException; -import java.text.SimpleDateFormat; - -import org.codehaus.jackson.JsonProcessingException; -import org.codehaus.jackson.map.DeserializationConfig; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.ObjectReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.classification.InterfaceStability; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.netlet.util.DTThrowable; - -/** - * Operator that converts JSON string to Pojo <br> - * <b>Properties</b> <br> - * <b>dateFormat</b>: date format e.g dd/MM/yyyy - * - * @displayName JsonParser - * @category Parsers - * @tags json pojo parser - * @since 3.2.0 - */ [email protected] -public class JsonParser extends Parser<String, String> -{ - - private transient ObjectReader reader; - protected String dateFormat; - - @Override - public void setup(OperatorContext context) - { - try { - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); - if (dateFormat != null) { - mapper.setDateFormat(new SimpleDateFormat(dateFormat)); - } - reader = mapper.reader(clazz); - } catch (Throwable e) { - throw new RuntimeException("Unable find provided class"); - } - } - - @Override - public Object convert(String tuple) - { - try { - if (!StringUtils.isEmpty(tuple)) { - return reader.readValue(tuple); - } - } catch (JsonProcessingException e) { - logger.debug("Error while converting tuple {} {}", tuple, e.getMessage()); - } catch (IOException e) { - DTThrowable.rethrow(e); - } - return null; - } - - @Override - public String processErrorTuple(String input) - { - return input; - } - - /** - * Get the date format - * - * @return Date format string - */ - public String getDateFormat() - { - return dateFormat; - } - - /** - * Set the date format - * - * @param dateFormat - */ - public void setDateFormat(String dateFormat) - { - this.dateFormat = dateFormat; - } - - private static final Logger logger = LoggerFactory.getLogger(JsonParser.class); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java b/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java deleted file mode 100644 index 0400d23..0000000 --- a/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java +++ /dev/null @@ -1,228 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.parser; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.PrintStream; -import java.util.Date; -import java.util.List; - -import org.apache.commons.io.FileUtils; -import org.joda.time.DateTime; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.Description; - -import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher; -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; -import com.datatorrent.lib.util.TestUtils.TestInfo; - -public class JsonParserTest -{ - JsonParser operator; - CollectorTestSink<Object> validDataSink; - CollectorTestSink<String> invalidDataSink; - - final ByteArrayOutputStream myOut = new ByteArrayOutputStream(); - - public JsonParserTest() - { - // So that the output is cleaner. - System.setErr(new PrintStream(myOut)); - } - - @Rule - public TestInfo testMeta = new FSTestWatcher() - { - private void deleteDirectory() - { - try { - FileUtils.deleteDirectory(new File(getDir())); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - @Override - protected void starting(Description descriptor) - { - - super.starting(descriptor); - deleteDirectory(); - - operator = new JsonParser(); - operator.setClazz(Test1Pojo.class); - validDataSink = new CollectorTestSink<Object>(); - invalidDataSink = new CollectorTestSink<String>(); - TestUtils.setSink(operator.out, validDataSink); - TestUtils.setSink(operator.err, invalidDataSink); - operator.setup(null); - - operator.beginWindow(0); - } - - @Override - protected void finished(Description description) - { - operator.endWindow(); - operator.teardown(); - - deleteDirectory(); - super.finished(description); - } - }; - - @Test - public void testJSONToPOJO() - { - String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; - operator.in.put(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); - Assert.assertNotNull(obj); - Assert.assertEquals(Test1Pojo.class, obj.getClass()); - Test1Pojo pojo = (Test1Pojo)obj; - Assert.assertEquals(123, pojo.a); - Assert.assertEquals(234876274, pojo.b); - Assert.assertEquals("HowAreYou?", pojo.c); - Assert.assertEquals(3, pojo.d.size()); - Assert.assertEquals("ABC", pojo.d.get(0)); - Assert.assertEquals("PQR", pojo.d.get(1)); - Assert.assertEquals("XYZ", pojo.d.get(2)); - } - - @Test - public void testJSONToPOJODate() - { - String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}"; - operator.setDateFormat("dd-MM-yyyy"); - operator.setup(null); - operator.in.put(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); - Assert.assertNotNull(obj); - Assert.assertEquals(Test1Pojo.class, obj.getClass()); - Test1Pojo pojo = (Test1Pojo)obj; - Assert.assertEquals(123, pojo.a); - Assert.assertEquals(234876274, pojo.b); - Assert.assertEquals("HowAreYou?", pojo.c); - Assert.assertEquals(3, pojo.d.size()); - Assert.assertEquals("ABC", pojo.d.get(0)); - Assert.assertEquals("PQR", pojo.d.get(1)); - Assert.assertEquals("XYZ", pojo.d.get(2)); - Assert.assertEquals(2015, new DateTime(pojo.date).getYear()); - Assert.assertEquals(9, new DateTime(pojo.date).getMonthOfYear()); - Assert.assertEquals(15, new DateTime(pojo.date).getDayOfMonth()); - } - - @Test - public void testJSONToPOJOInvalidData() - { - String tuple = "{\"a\":123\"b\":234876274,\"c\":\"HowAreYou?\"}"; - operator.in.put(tuple); - Assert.assertEquals(0, validDataSink.collectedTuples.size()); - Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); - Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); - } - - @Test - public void testJSONToPOJOUnknownFields() - { - String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"asd\":433.6}"; - operator.in.put(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); - Assert.assertNotNull(obj); - Assert.assertEquals(Test1Pojo.class, obj.getClass()); - Test1Pojo pojo = (Test1Pojo)obj; - Assert.assertEquals(123, pojo.a); - Assert.assertEquals(234876274, pojo.b); - Assert.assertEquals("HowAreYou?", pojo.c); - Assert.assertEquals(null, pojo.d); - } - - @Test - public void testJSONToPOJOMismatchingFields() - { - String tuple = "{\"a\":123,\"c\":234876274,\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; - operator.in.put(tuple); - Assert.assertEquals(0, validDataSink.collectedTuples.size()); - Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); - Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); - } - - @Test - public void testJSONToPOJOEmptyString() - { - String tuple = ""; - operator.in.put(tuple); - Assert.assertEquals(0, validDataSink.collectedTuples.size()); - Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); - Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); - } - - @Test - public void testJSONToPOJOEmptyJSON() - { - String tuple = "{}"; - operator.in.put(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); - Assert.assertNotNull(obj); - Assert.assertEquals(Test1Pojo.class, obj.getClass()); - Test1Pojo pojo = (Test1Pojo)obj; - Assert.assertEquals(0, pojo.a); - Assert.assertEquals(0, pojo.b); - Assert.assertEquals(null, pojo.c); - Assert.assertEquals(null, pojo.d); - } - - @Test - public void testJSONToPOJOArrayInJson() - { - String tuple = "{\"a\":123,\"c\":[234,65,23],\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; - operator.in.put(tuple); - Assert.assertEquals(0, validDataSink.collectedTuples.size()); - Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); - Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); - } - - public static class Test1Pojo - { - public int a; - public long b; - public String c; - public List<String> d; - public Date date; - - @Override - public String toString() - { - return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]"; - } - } -}
