Repository: apex-malhar Updated Branches: refs/heads/master ee77dc654 -> c7b8bccb4
APEXMALHAR-2316 APEXMALHAR-2346 Moved suitable code to activate() for initializing tuple class attribute and changed the argument type of DocumentBuilder.parse() to InputSource. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c7b8bccb Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c7b8bccb Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c7b8bccb Branch: refs/heads/master Commit: c7b8bccb4fbb3dcc66e6552e37be1986ac4459bb Parents: ee77dc6 Author: Hitesh-Scorpio <[email protected]> Authored: Mon Nov 7 14:50:13 2016 +0530 Committer: Hitesh-Scorpio <[email protected]> Committed: Thu Dec 1 18:27:36 2016 +0530 ---------------------------------------------------------------------- .../com/datatorrent/lib/parser/XmlParser.java | 36 ++++-- .../lib/parser/XmlParserApplicationTest.java | 118 +++++++++++++++++++ .../datatorrent/lib/parser/XmlParserTest.java | 6 + 3 files changed, 152 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c7b8bccb/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java index bdf6fad..3b98ea7 100644 --- a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java +++ b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java @@ -18,6 +18,7 @@ */ package com.datatorrent.lib.parser; +import java.io.ByteArrayInputStream; import java.io.IOException; import javax.xml.XMLConstants; @@ -35,6 +36,7 @@ import javax.xml.validation.Validator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; +import org.xml.sax.InputSource; import org.xml.sax.SAXException; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -42,7 +44,9 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import com.datatorrent.api.Context; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; import com.datatorrent.lib.util.ReusableStringReader; import com.datatorrent.netlet.util.DTThrowable; @@ -61,11 +65,12 @@ import com.datatorrent.netlet.util.DTThrowable; * @since 3.2.0 */ @InterfaceStability.Evolving -public class XmlParser extends Parser<String, String> +public class XmlParser extends Parser<String, String> implements Operator.ActivationListener<Context> { private String schemaXSDFile; private transient Unmarshaller unmarshaller; private transient Validator validator; + private transient Schema schema; private ReusableStringReader reader = new ReusableStringReader(); public transient DefaultOutputPort<Document> parsedOutput = new DefaultOutputPort<Document>(); @@ -94,7 +99,7 @@ public class XmlParser extends Parser<String, String> DocumentBuilder builder; try { builder = factory.newDocumentBuilder(); - Document doc = builder.parse(inputTuple); + Document doc = builder.parse(new InputSource(new ByteArrayInputStream(inputTuple.getBytes("UTF-8")))); parsedOutput.emit(doc); } catch (Exception e) { @@ -132,8 +137,6 @@ public class XmlParser extends Parser<String, String> public void setup(com.datatorrent.api.Context.OperatorContext context) { try { - JAXBContext ctx = JAXBContext.newInstance(getClazz()); - unmarshaller = ctx.createUnmarshaller(); if (schemaXSDFile != null) { Path filePath = new Path(schemaXSDFile); Configuration configuration = new Configuration(); @@ -141,15 +144,12 @@ public class XmlParser extends Parser<String, String> FSDataInputStream inputStream = fs.open(filePath); SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); - Schema schema = factory.newSchema(new StreamSource(inputStream)); - unmarshaller.setSchema(schema); + schema = factory.newSchema(new StreamSource(inputStream)); validator = schema.newValidator(); fs.close(); } } catch (SAXException e) { DTThrowable.wrapIfChecked(e); - } catch (JAXBException e) { - DTThrowable.wrapIfChecked(e); } catch (IOException e) { DTThrowable.wrapIfChecked(e); } @@ -166,4 +166,24 @@ public class XmlParser extends Parser<String, String> } public static Logger LOG = LoggerFactory.getLogger(Parser.class); + + @Override + public void activate(Context context) + { + try { + JAXBContext ctx = JAXBContext.newInstance(getClazz()); + unmarshaller = ctx.createUnmarshaller(); + if (schemaXSDFile != null) { + unmarshaller.setSchema(schema); + } + } catch (JAXBException e) { + DTThrowable.wrapIfChecked(e); + } + } + + @Override + public void deactivate() + { + + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c7b8bccb/library/src/test/java/com/datatorrent/lib/parser/XmlParserApplicationTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/parser/XmlParserApplicationTest.java b/library/src/test/java/com/datatorrent/lib/parser/XmlParserApplicationTest.java new file mode 100644 index 0000000..bd8b4a7 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/parser/XmlParserApplicationTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.parser; + +import java.util.concurrent.Callable; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Test to check if a tuple class can be set properly for XMLParser and + * a simple sample app is running fine for the same.(Ref. APEXMALHAR-2316 and APEXMALHAR-2346) + */ +public class XmlParserApplicationTest +{ + public static int TupleCount; + public static com.datatorrent.lib.parser.XmlParserTest.EmployeeBean obj; + @Test + public void testApplication() + { + try { + LocalMode lma = LocalMode.newInstance(); + DAG dag = lma.getDAG(); + XmlDataEmitterOperator input = dag.addOperator("data", new XmlDataEmitterOperator()); + XmlParser parser = dag.addOperator("xmlparser", new XmlParser()); + ResultCollector rc = dag.addOperator("rc", new ResultCollector()); + dag.getMeta(parser).getMeta(parser.out).getAttributes().put(Context.PortContext.TUPLE_CLASS, com.datatorrent.lib.parser.XmlParserTest.EmployeeBean.class); + ConsoleOutputOperator xmlObjectOp = dag.addOperator("xmlObjectOp", new ConsoleOutputOperator()); + xmlObjectOp.setDebug(true); + dag.addStream("input", input.output, parser.in); + dag.addStream("output", parser.parsedOutput, xmlObjectOp.input); + dag.addStream("pojo", parser.out,rc.input); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(false); + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return TupleCount == 1; + } + }); + lc.run(10000);// runs for 10 seconds and quits + Assert.assertEquals(1,TupleCount); + Assert.assertEquals("john", obj.getName()); + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + public static class XmlDataEmitterOperator extends BaseOperator implements InputOperator + { + public static String xmlSample = "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" + "<name>john</name>" + + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>" + + "<city>new york</city>" + "<country>US</country>" + "</address>" + + "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>"; + public static boolean emitTuple = true; + + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); + + @Override + public void emitTuples() + { + if (emitTuple ) { + output.emit(xmlSample); + emitTuple = false; + } + } + } + + public static class ResultCollector extends BaseOperator + { + public final transient DefaultInputPort<java.lang.Object> input = new DefaultInputPort<java.lang.Object>() + { + @Override + public void process(java.lang.Object in) + { + obj = (XmlParserTest.EmployeeBean)in; + TupleCount++; + } + }; + + @Override + public void setup(Context.OperatorContext context) + { + TupleCount = 0; + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c7b8bccb/library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java b/library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java index 670d5b6..685b383 100644 --- a/library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java +++ b/library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java @@ -93,6 +93,7 @@ public class XmlParserTest + "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>"; operator.setup(null); + operator.activate(null); operator.in.process(tuple); Assert.assertEquals(1, validDataSink.collectedTuples.size()); Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); @@ -116,6 +117,7 @@ public class XmlParserTest operator.setClazz(EmployeeBeanOverride.class); operator.setup(null); + operator.activate(null); operator.in.process(tuple); Assert.assertEquals(1, validDataSink.collectedTuples.size()); Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); @@ -140,6 +142,7 @@ public class XmlParserTest String tuple = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>"; operator.setup(null); + operator.activate(null); operator.in.process(tuple); Assert.assertEquals(0, validDataSink.collectedTuples.size()); Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); @@ -191,6 +194,7 @@ public class XmlParserTest + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>"; operator.setup(null); + operator.activate(null); operator.in.process(tuple); Assert.assertEquals(1, validDataSink.collectedTuples.size()); Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); @@ -214,6 +218,7 @@ public class XmlParserTest // + "</EmployeeBean>"; // Incorrect XML format operator.setup(null); + operator.activate(null); operator.in.process(tuple); Assert.assertEquals(0, validDataSink.collectedTuples.size()); Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); @@ -229,6 +234,7 @@ public class XmlParserTest + "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>"; operator.setup(null); + operator.activate(null); operator.in.process(tuple); Assert.assertEquals(1, validDataSink.collectedTuples.size()); Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
