Repository: apex-malhar
Updated Branches:
  refs/heads/master ee77dc654 -> c7b8bccb4


APEXMALHAR-2316 APEXMALHAR-2346 Moved suitable code to activate() for 
initializing tuple class attribute and changed the argument type of 
DocumentBuilder.parse() to InputSource.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c7b8bccb
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c7b8bccb
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c7b8bccb

Branch: refs/heads/master
Commit: c7b8bccb4fbb3dcc66e6552e37be1986ac4459bb
Parents: ee77dc6
Author: Hitesh-Scorpio <[email protected]>
Authored: Mon Nov 7 14:50:13 2016 +0530
Committer: Hitesh-Scorpio <[email protected]>
Committed: Thu Dec 1 18:27:36 2016 +0530

----------------------------------------------------------------------
 .../com/datatorrent/lib/parser/XmlParser.java   |  36 ++++--
 .../lib/parser/XmlParserApplicationTest.java    | 118 +++++++++++++++++++
 .../datatorrent/lib/parser/XmlParserTest.java   |   6 +
 3 files changed, 152 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c7b8bccb/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java 
b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
index bdf6fad..3b98ea7 100644
--- a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
+++ b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
@@ -18,6 +18,7 @@
  */
 package com.datatorrent.lib.parser;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 
 import javax.xml.XMLConstants;
@@ -35,6 +36,7 @@ import javax.xml.validation.Validator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
+import org.xml.sax.InputSource;
 import org.xml.sax.SAXException;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -42,7 +44,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
 import com.datatorrent.lib.util.ReusableStringReader;
 import com.datatorrent.netlet.util.DTThrowable;
 
@@ -61,11 +65,12 @@ import com.datatorrent.netlet.util.DTThrowable;
  * @since 3.2.0
  */
 @InterfaceStability.Evolving
-public class XmlParser extends Parser<String, String>
+public class XmlParser extends Parser<String, String> implements 
Operator.ActivationListener<Context>
 {
   private String schemaXSDFile;
   private transient Unmarshaller unmarshaller;
   private transient Validator validator;
+  private transient Schema schema;
   private ReusableStringReader reader = new ReusableStringReader();
   public transient DefaultOutputPort<Document> parsedOutput = new 
DefaultOutputPort<Document>();
 
@@ -94,7 +99,7 @@ public class XmlParser extends Parser<String, String>
         DocumentBuilder builder;
         try {
           builder = factory.newDocumentBuilder();
-          Document doc = builder.parse(inputTuple);
+          Document doc = builder.parse(new InputSource(new 
ByteArrayInputStream(inputTuple.getBytes("UTF-8"))));
           parsedOutput.emit(doc);
 
         } catch (Exception e) {
@@ -132,8 +137,6 @@ public class XmlParser extends Parser<String, String>
   public void setup(com.datatorrent.api.Context.OperatorContext context)
   {
     try {
-      JAXBContext ctx = JAXBContext.newInstance(getClazz());
-      unmarshaller = ctx.createUnmarshaller();
       if (schemaXSDFile != null) {
         Path filePath = new Path(schemaXSDFile);
         Configuration configuration = new Configuration();
@@ -141,15 +144,12 @@ public class XmlParser extends Parser<String, String>
         FSDataInputStream inputStream = fs.open(filePath);
 
         SchemaFactory factory = 
SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
-        Schema schema = factory.newSchema(new StreamSource(inputStream));
-        unmarshaller.setSchema(schema);
+        schema = factory.newSchema(new StreamSource(inputStream));
         validator = schema.newValidator();
         fs.close();
       }
     } catch (SAXException e) {
       DTThrowable.wrapIfChecked(e);
-    } catch (JAXBException e) {
-      DTThrowable.wrapIfChecked(e);
     } catch (IOException e) {
       DTThrowable.wrapIfChecked(e);
     }
@@ -166,4 +166,24 @@ public class XmlParser extends Parser<String, String>
   }
 
   public static Logger LOG = LoggerFactory.getLogger(Parser.class);
+
+  @Override
+  public void activate(Context context)
+  {
+    try {
+      JAXBContext ctx = JAXBContext.newInstance(getClazz());
+      unmarshaller = ctx.createUnmarshaller();
+      if (schemaXSDFile != null) {
+        unmarshaller.setSchema(schema);
+      }
+    } catch (JAXBException e) {
+      DTThrowable.wrapIfChecked(e);
+    }
+  }
+
+  @Override
+  public void deactivate()
+  {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c7b8bccb/library/src/test/java/com/datatorrent/lib/parser/XmlParserApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/parser/XmlParserApplicationTest.java
 
b/library/src/test/java/com/datatorrent/lib/parser/XmlParserApplicationTest.java
new file mode 100644
index 0000000..bd8b4a7
--- /dev/null
+++ 
b/library/src/test/java/com/datatorrent/lib/parser/XmlParserApplicationTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.parser;
+
+import java.util.concurrent.Callable;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Test to check if a tuple class can be set properly for XMLParser and
+ * a simple sample app is running fine for the same.(Ref. APEXMALHAR-2316 and 
APEXMALHAR-2346)
+ */
+public class XmlParserApplicationTest
+{
+  public static int TupleCount;
+  public static com.datatorrent.lib.parser.XmlParserTest.EmployeeBean obj;
+  @Test
+  public void testApplication()
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      DAG dag = lma.getDAG();
+      XmlDataEmitterOperator input = dag.addOperator("data", new 
XmlDataEmitterOperator());
+      XmlParser parser = dag.addOperator("xmlparser", new XmlParser());
+      ResultCollector rc = dag.addOperator("rc", new ResultCollector());
+      
dag.getMeta(parser).getMeta(parser.out).getAttributes().put(Context.PortContext.TUPLE_CLASS,
 com.datatorrent.lib.parser.XmlParserTest.EmployeeBean.class);
+      ConsoleOutputOperator xmlObjectOp = dag.addOperator("xmlObjectOp", new 
ConsoleOutputOperator());
+      xmlObjectOp.setDebug(true);
+      dag.addStream("input", input.output, parser.in);
+      dag.addStream("output", parser.parsedOutput, xmlObjectOp.input);
+      dag.addStream("pojo", parser.out,rc.input);
+      LocalMode.Controller lc = lma.getController();
+      lc.setHeartbeatMonitoringEnabled(false);
+      ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+      {
+        @Override
+        public Boolean call() throws Exception
+        {
+          return TupleCount == 1;
+        }
+      });
+      lc.run(10000);// runs for 10 seconds and quits
+      Assert.assertEquals(1,TupleCount);
+      Assert.assertEquals("john", obj.getName());
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+  public static class XmlDataEmitterOperator extends BaseOperator implements 
InputOperator
+  {
+    public static String xmlSample = 
"<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" + 
"<name>john</name>"
+        + "<dept>cs</dept>" + "<eid>1</eid>" + 
"<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>"
+        + "<city>new york</city>" + "<country>US</country>" + "</address>"
+        + 
"</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>";
+    public static boolean emitTuple = true;
+
+    public final transient DefaultOutputPort<String> output = new 
DefaultOutputPort<String>();
+
+    @Override
+    public void emitTuples()
+    {
+      if (emitTuple ) {
+        output.emit(xmlSample);
+        emitTuple = false;
+      }
+    }
+  }
+
+  public static class ResultCollector extends BaseOperator
+  {
+    public final transient DefaultInputPort<java.lang.Object> input = new 
DefaultInputPort<java.lang.Object>()
+    {
+      @Override
+      public void process(java.lang.Object in)
+      {
+        obj = (XmlParserTest.EmployeeBean)in;
+        TupleCount++;
+      }
+    };
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      TupleCount = 0;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c7b8bccb/library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java 
b/library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java
index 670d5b6..685b383 100644
--- a/library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java
+++ b/library/src/test/java/com/datatorrent/lib/parser/XmlParserTest.java
@@ -93,6 +93,7 @@ public class XmlParserTest
         + 
"</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>";
 
     operator.setup(null);
+    operator.activate(null);
     operator.in.process(tuple);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
@@ -116,6 +117,7 @@ public class XmlParserTest
 
     operator.setClazz(EmployeeBeanOverride.class);
     operator.setup(null);
+    operator.activate(null);
     operator.in.process(tuple);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
@@ -140,6 +142,7 @@ public class XmlParserTest
     String tuple = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" 
+ "<eid>1</eid>"
         + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>";
     operator.setup(null);
+    operator.activate(null);
     operator.in.process(tuple);
     Assert.assertEquals(0, validDataSink.collectedTuples.size());
     Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
@@ -191,6 +194,7 @@ public class XmlParserTest
         + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>";
 
     operator.setup(null);
+    operator.activate(null);
     operator.in.process(tuple);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
@@ -214,6 +218,7 @@ public class XmlParserTest
     // + "</EmployeeBean>"; // Incorrect XML format
 
     operator.setup(null);
+    operator.activate(null);
     operator.in.process(tuple);
     Assert.assertEquals(0, validDataSink.collectedTuples.size());
     Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
@@ -229,6 +234,7 @@ public class XmlParserTest
         + 
"</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>";
 
     operator.setup(null);
+    operator.activate(null);
     operator.in.process(tuple);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());

Reply via email to