http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java deleted file mode 100644 index 969da1e..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java +++ /dev/null @@ -1,244 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.schema.formatter; - -import java.util.Date; - -import org.joda.time.DateTime; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; - -import com.datatorrent.contrib.schema.formatter.XmlFormatter; -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; - -public class XmlFormatterTest -{ - - XmlFormatter operator; - CollectorTestSink<Object> validDataSink; - CollectorTestSink<String> invalidDataSink; - - @Rule - public Watcher watcher = new Watcher(); - - public class Watcher extends TestWatcher - { - - @Override - protected void starting(Description description) - { - super.starting(description); - operator = new XmlFormatter(); - operator.setClazz(EmployeeBean.class); - operator.setDateFormat("yyyy-MM-dd"); - validDataSink = new CollectorTestSink<Object>(); - invalidDataSink = new CollectorTestSink<String>(); - TestUtils.setSink(operator.out, validDataSink); - TestUtils.setSink(operator.err, invalidDataSink); - } - - @Override - protected void finished(Description description) - { - super.finished(description); - operator.teardown(); - } - - } - - @Test - public void testPojoToXmlWithoutAlias() - { - EmployeeBean e = new EmployeeBean(); - e.setName("john"); - e.setEid(1); - e.setDept("cs"); - e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); - - operator.setup(null); - operator.activate(null); - operator.in.process(e); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String expected = "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" + "<name>john</name>" - + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" - + "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>"; - Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); - } - - @Test - public void testXmlToPojoWithAlias() - { - EmployeeBean e = new EmployeeBean(); - e.setName("john"); - e.setEid(1); - e.setDept("cs"); - e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); - - operator.setAlias("EmployeeBean"); - operator.setup(null); - operator.activate(null); - operator.in.process(e); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String expected = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>" - + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>"; - Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); - } - - @Test - public void testXmlToPojoWithPrettyPrint() - { - EmployeeBean e = new EmployeeBean(); - e.setName("john"); - e.setEid(1); - e.setDept("cs"); - e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); - - operator.setAlias("EmployeeBean"); - operator.setPrettyPrint(true); - operator.setup(null); - operator.activate(null); - operator.in.process(e); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String expected = "<EmployeeBean>\n" + " <name>john</name>\n" + " <dept>cs</dept>\n" + " <eid>1</eid>\n" - + " <dateOfJoining>2015-01-01</dateOfJoining>\n" + "</EmployeeBean>"; - Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); - } - - @Test - public void testPojoToXmlWithoutAliasHeirarchical() - { - EmployeeBean e = new EmployeeBean(); - e.setName("john"); - e.setEid(1); - e.setDept("cs"); - e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); - Address address = new Address(); - address.setCity("new york"); - address.setCountry("US"); - e.setAddress(address); - - operator.setup(null); - operator.activate(null); - operator.in.process(e); - System.out.println(validDataSink.collectedTuples.get(0)); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String expected = "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" + "<name>john</name>" - + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>" - + "<city>new york</city>" + "<country>US</country>" + "</address>" - + "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>"; - Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); - } - - public static class EmployeeBean - { - - private String name; - private String dept; - private int eid; - private Date dateOfJoining; - private Address address; - - public String getName() - { - return name; - } - - public void setName(String name) - { - this.name = name; - } - - public String getDept() - { - return dept; - } - - public void setDept(String dept) - { - this.dept = dept; - } - - public int getEid() - { - return eid; - } - - public void setEid(int eid) - { - this.eid = eid; - } - - public Date getDateOfJoining() - { - return dateOfJoining; - } - - public void setDateOfJoining(Date dateOfJoining) - { - this.dateOfJoining = dateOfJoining; - } - - public Address getAddress() - { - return address; - } - - public void setAddress(Address address) - { - this.address = address; - } - } - - public static class Address - { - - private String city; - private String country; - - public String getCity() - { - return city; - } - - public void setCity(String city) - { - this.city = city; - } - - public String getCountry() - { - return country; - } - - public void setCountry(String country) - { - this.country = country; - } - - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java deleted file mode 100644 index 9e87496..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.schema.parser; - -import java.util.Date; - -import org.joda.time.DateTime; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; - -import com.datatorrent.contrib.schema.parser.CsvParser; -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; - -public class CsvParserTest -{ - - CsvParser operator; - CollectorTestSink<Object> validDataSink; - CollectorTestSink<String> invalidDataSink; - - @Rule - public Watcher watcher = new Watcher(); - - public class Watcher extends TestWatcher - { - - @Override - protected void starting(Description description) - { - super.starting(description); - operator = new CsvParser(); - operator.setClazz(EmployeeBean.class); - operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date"); - validDataSink = new CollectorTestSink<Object>(); - invalidDataSink = new CollectorTestSink<String>(); - TestUtils.setSink(operator.out, validDataSink); - TestUtils.setSink(operator.err, invalidDataSink); - } - - @Override - protected void finished(Description description) - { - super.finished(description); - operator.teardown(); - } - - } - - @Test - public void testCsvToPojoWriterDefault() - { - operator.setup(null); - String tuple = "john,cs,1,01/01/2015"; - operator.in.process(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); - Assert.assertNotNull(obj); - Assert.assertEquals(EmployeeBean.class, obj.getClass()); - EmployeeBean pojo = (EmployeeBean)obj; - Assert.assertEquals("john", pojo.getName()); - Assert.assertEquals("cs", pojo.getDept()); - Assert.assertEquals(1, pojo.getEid()); - Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( - pojo.getDateOfJoining())); - } - - @Test - public void testCsvToPojoWriterDateFormat() - { - operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy"); - operator.setup(null); - String tuple = "john,cs,1,01-JAN-2015"; - operator.in.process(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); - Assert.assertNotNull(obj); - Assert.assertEquals(EmployeeBean.class, obj.getClass()); - EmployeeBean pojo = (EmployeeBean)obj; - Assert.assertEquals("john", pojo.getName()); - Assert.assertEquals("cs", pojo.getDept()); - Assert.assertEquals(1, pojo.getEid()); - Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( - pojo.getDateOfJoining())); - } - - @Test - public void testCsvToPojoWriterDateFormatMultiple() - { - operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy,dateOfBirth:date"); - operator.setup(null); - String tuple = "john,cs,1,01-JAN-2015,01/01/2015"; - operator.in.process(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); - Assert.assertNotNull(obj); - Assert.assertEquals(EmployeeBean.class, obj.getClass()); - EmployeeBean pojo = (EmployeeBean)obj; - Assert.assertEquals("john", pojo.getName()); - Assert.assertEquals("cs", pojo.getDept()); - Assert.assertEquals(1, pojo.getEid()); - Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( - pojo.getDateOfJoining())); - Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime( - pojo.getDateOfBirth())); - } - - public static class EmployeeBean - { - - private String name; - private String dept; - private int eid; - private Date dateOfJoining; - private Date dateOfBirth; - - public String getName() - { - return name; - } - - public void setName(String name) - { - this.name = name; - } - - public String getDept() - { - return dept; - } - - public void setDept(String dept) - { - this.dept = dept; - } - - public int getEid() - { - return eid; - } - - public void setEid(int eid) - { - this.eid = eid; - } - - public Date getDateOfJoining() - { - return dateOfJoining; - } - - public void setDateOfJoining(Date dateOfJoining) - { - this.dateOfJoining = dateOfJoining; - } - - public Date getDateOfBirth() - { - return dateOfBirth; - } - - public void setDateOfBirth(Date dateOfBirth) - { - this.dateOfBirth = dateOfBirth; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java deleted file mode 100644 index 5a50ddb..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java +++ /dev/null @@ -1,230 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.schema.parser; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.PrintStream; -import java.util.Date; -import java.util.List; - -import org.apache.commons.io.FileUtils; -import org.joda.time.DateTime; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.Description; - -import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher; -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; -import com.datatorrent.lib.util.TestUtils.TestInfo; - -public class JsonParserTest -{ - JsonParser operator; - CollectorTestSink<Object> validDataSink; - CollectorTestSink<String> invalidDataSink; - - final ByteArrayOutputStream myOut = new ByteArrayOutputStream(); - - public JsonParserTest() - { - // So that the output is cleaner. - System.setErr(new PrintStream(myOut)); - } - - @Rule - public TestInfo testMeta = new FSTestWatcher() - { - private void deleteDirectory() - { - try { - FileUtils.deleteDirectory(new File(getDir())); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - @Override - protected void starting(Description descriptor) - { - - super.starting(descriptor); - deleteDirectory(); - - operator = new JsonParser(); - operator.setClazz(Test1Pojo.class); - validDataSink = new CollectorTestSink<Object>(); - invalidDataSink = new CollectorTestSink<String>(); - TestUtils.setSink(operator.out, validDataSink); - TestUtils.setSink(operator.err, invalidDataSink); - operator.setup(null); - operator.activate(null); - - operator.beginWindow(0); - } - - @Override - protected void finished(Description description) - { - operator.endWindow(); - operator.teardown(); - - deleteDirectory(); - super.finished(description); - } - }; - - @Test - public void testJSONToPOJO() - { - String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; - operator.in.put(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); - Assert.assertNotNull(obj); - Assert.assertEquals(Test1Pojo.class, obj.getClass()); - Test1Pojo pojo = (Test1Pojo)obj; - Assert.assertEquals(123, pojo.a); - Assert.assertEquals(234876274, pojo.b); - Assert.assertEquals("HowAreYou?", pojo.c); - Assert.assertEquals(3, pojo.d.size()); - Assert.assertEquals("ABC", pojo.d.get(0)); - Assert.assertEquals("PQR", pojo.d.get(1)); - Assert.assertEquals("XYZ", pojo.d.get(2)); - } - - @Test - public void testJSONToPOJODate() - { - String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}"; - operator.setDateFormat("dd-MM-yyyy"); - operator.setup(null); - operator.activate(null); - operator.in.put(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); - Assert.assertNotNull(obj); - Assert.assertEquals(Test1Pojo.class, obj.getClass()); - Test1Pojo pojo = (Test1Pojo)obj; - Assert.assertEquals(123, pojo.a); - Assert.assertEquals(234876274, pojo.b); - Assert.assertEquals("HowAreYou?", pojo.c); - Assert.assertEquals(3, pojo.d.size()); - Assert.assertEquals("ABC", pojo.d.get(0)); - Assert.assertEquals("PQR", pojo.d.get(1)); - Assert.assertEquals("XYZ", pojo.d.get(2)); - Assert.assertEquals(2015, new DateTime(pojo.date).getYear()); - Assert.assertEquals(9, new DateTime(pojo.date).getMonthOfYear()); - Assert.assertEquals(15, new DateTime(pojo.date).getDayOfMonth()); - } - - @Test - public void testJSONToPOJOInvalidData() - { - String tuple = "{\"a\":123\"b\":234876274,\"c\":\"HowAreYou?\"}"; - operator.in.put(tuple); - Assert.assertEquals(0, validDataSink.collectedTuples.size()); - Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); - Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); - } - - @Test - public void testJSONToPOJOUnknownFields() - { - String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"asd\":433.6}"; - operator.in.put(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); - Assert.assertNotNull(obj); - Assert.assertEquals(Test1Pojo.class, obj.getClass()); - Test1Pojo pojo = (Test1Pojo)obj; - Assert.assertEquals(123, pojo.a); - Assert.assertEquals(234876274, pojo.b); - Assert.assertEquals("HowAreYou?", pojo.c); - Assert.assertEquals(null, pojo.d); - } - - @Test - public void testJSONToPOJOMismatchingFields() - { - String tuple = "{\"a\":123,\"c\":234876274,\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; - operator.in.put(tuple); - Assert.assertEquals(0, validDataSink.collectedTuples.size()); - Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); - Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); - } - - @Test - public void testJSONToPOJOEmptyString() - { - String tuple = ""; - operator.in.put(tuple); - Assert.assertEquals(0, validDataSink.collectedTuples.size()); - Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); - Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); - } - - @Test - public void testJSONToPOJOEmptyJSON() - { - String tuple = "{}"; - operator.in.put(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); - Assert.assertNotNull(obj); - Assert.assertEquals(Test1Pojo.class, obj.getClass()); - Test1Pojo pojo = (Test1Pojo)obj; - Assert.assertEquals(0, pojo.a); - Assert.assertEquals(0, pojo.b); - Assert.assertEquals(null, pojo.c); - Assert.assertEquals(null, pojo.d); - } - - @Test - public void testJSONToPOJOArrayInJson() - { - String tuple = "{\"a\":123,\"c\":[234,65,23],\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; - operator.in.put(tuple); - Assert.assertEquals(0, validDataSink.collectedTuples.size()); - Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); - Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); - } - - public static class Test1Pojo - { - public int a; - public long b; - public String c; - public List<String> d; - public Date date; - - @Override - public String toString() - { - return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]"; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java deleted file mode 100644 index c5f0407..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java +++ /dev/null @@ -1,272 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.schema.parser; - -import java.util.Date; - -import org.joda.time.DateTime; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; - -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; - -public class XmlParserTest -{ - XmlParser operator; - CollectorTestSink<Object> validDataSink; - CollectorTestSink<String> invalidDataSink; - - @Rule - public Watcher watcher = new Watcher(); - - public class Watcher extends TestWatcher - { - - @Override - protected void starting(Description description) - { - super.starting(description); - operator = new XmlParser(); - operator.setClazz(EmployeeBean.class); - operator.setDateFormats("yyyy-MM-dd"); //setting default date pattern - validDataSink = new CollectorTestSink<Object>(); - invalidDataSink = new CollectorTestSink<String>(); - TestUtils.setSink(operator.out, validDataSink); - TestUtils.setSink(operator.err, invalidDataSink); - } - - @Override - protected void finished(Description description) - { - super.finished(description); - operator.teardown(); - } - - } - - @Test - public void testXmlToPojoWithoutAlias() - { - String tuple = "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" + "<name>john</name>" - + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" - + "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>"; - - operator.setup(null); - operator.activate(null); - operator.in.process(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); - Assert.assertNotNull(obj); - Assert.assertEquals(EmployeeBean.class, obj.getClass()); - EmployeeBean pojo = (EmployeeBean)obj; - Assert.assertEquals("john", pojo.getName()); - Assert.assertEquals("cs", pojo.getDept()); - Assert.assertEquals(1, pojo.getEid()); - Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear()); - Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear()); - Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth()); - } - - @Test - public void testXmlToPojoWithAliasDateFormat() - { - String tuple = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>" - + "<dateOfJoining>2015-JAN-01</dateOfJoining>" + "</EmployeeBean>"; - - operator.setAlias("EmployeeBean"); - operator.setDateFormats("yyyy-MM-dd,yyyy-MMM-dd"); - operator.setup(null); - operator.activate(null); - operator.in.process(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); - Assert.assertNotNull(obj); - Assert.assertEquals(EmployeeBean.class, obj.getClass()); - EmployeeBean pojo = (EmployeeBean)obj; - Assert.assertEquals("john", pojo.getName()); - Assert.assertEquals("cs", pojo.getDept()); - Assert.assertEquals(1, pojo.getEid()); - Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear()); - Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear()); - Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth()); - } - - @Test - public void testXmlToPojoWithAlias() - { - String tuple = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>" - + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>"; - - operator.setAlias("EmployeeBean"); - operator.setup(null); - operator.activate(null); - operator.in.process(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); - Assert.assertNotNull(obj); - Assert.assertEquals(EmployeeBean.class, obj.getClass()); - EmployeeBean pojo = (EmployeeBean)obj; - Assert.assertEquals("john", pojo.getName()); - Assert.assertEquals("cs", pojo.getDept()); - Assert.assertEquals(1, pojo.getEid()); - Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear()); - Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear()); - Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth()); - } - - @Test - public void testXmlToPojoIncorrectXML() - { - String tuple = "<EmployeeBean>" - + "<firstname>john</firstname>" //incorrect field name - + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01 00:00:00.00 IST</dateOfJoining>" - + "</EmployeeBean>"; - - operator.setAlias("EmployeeBean"); - operator.setup(null); - operator.activate(null); - operator.in.process(tuple); - Assert.assertEquals(0, validDataSink.collectedTuples.size()); - Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); - Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); - } - - @Test - public void testXmlToPojoWithoutAliasHeirarchical() - { - String tuple = "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" + "<name>john</name>" - + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>" - + "<city>new york</city>" + "<country>US</country>" + "</address>" - + "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>"; - - operator.setup(null); - operator.activate(null); - operator.in.process(tuple); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - Object obj = validDataSink.collectedTuples.get(0); - Assert.assertNotNull(obj); - Assert.assertEquals(EmployeeBean.class, obj.getClass()); - EmployeeBean pojo = (EmployeeBean)obj; - Assert.assertEquals("john", pojo.getName()); - Assert.assertEquals("cs", pojo.getDept()); - Assert.assertEquals(1, pojo.getEid()); - Assert.assertEquals(Address.class, pojo.getAddress().getClass()); - Assert.assertEquals("new york", pojo.getAddress().getCity()); - Assert.assertEquals("US", pojo.getAddress().getCountry()); - Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear()); - Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear()); - Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth()); - } - - public static class EmployeeBean - { - - private String name; - private String dept; - private int eid; - private Date dateOfJoining; - private Address address; - - public String getName() - { - return name; - } - - public void setName(String name) - { - this.name = name; - } - - public String getDept() - { - return dept; - } - - public void setDept(String dept) - { - this.dept = dept; - } - - public int getEid() - { - return eid; - } - - public void setEid(int eid) - { - this.eid = eid; - } - - public Date getDateOfJoining() - { - return dateOfJoining; - } - - public void setDateOfJoining(Date dateOfJoining) - { - this.dateOfJoining = dateOfJoining; - } - - public Address getAddress() - { - return address; - } - - public void setAddress(Address address) - { - this.address = address; - } - } - - public static class Address - { - - private String city; - private String country; - - public String getCity() - { - return city; - } - - public void setCity(String city) - { - this.city = city; - } - - public String getCountry() - { - return country; - } - - public void setCountry(String country) - { - this.country = country; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/pom.xml ---------------------------------------------------------------------- diff --git a/library/pom.xml b/library/pom.xml index 8df7a7e..cbc0fb7 100644 --- a/library/pom.xml +++ b/library/pom.xml @@ -300,6 +300,12 @@ <scope>test</scope> </dependency> <dependency> + <!-- required by Xml parser and formatter --> + <groupId>com.thoughtworks.xstream</groupId> + <artifactId>xstream</artifactId> + <version>1.4.8</version> + </dependency> + <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-websocket</artifactId> <version>${jetty.version}</version> @@ -310,6 +316,12 @@ <artifactId>commons-beanutils</artifactId> <version>1.8.3</version> </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>2.9.1</version> + </dependency> + </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/main/java/com/datatorrent/lib/converter/Converter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/converter/Converter.java b/library/src/main/java/com/datatorrent/lib/converter/Converter.java new file mode 100644 index 0000000..f1d4325 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/converter/Converter.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.converter; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Operators that are converting tuples from one format to another must + * implement this interface. Eg. Parsers or formatters , that parse data of + * certain format and convert them to another format. + * + * @param <INPUT> + * @param <OUTPUT> + * @since 3.2.0 + */ [email protected] +public interface Converter<INPUT, OUTPUT> +{ + /** + * Provide the implementation for converting tuples from one format to the + * other + * + * @param INPUT + * tuple of certain format + * @return OUTPUT tuple of converted format + */ + public OUTPUT convert(INPUT tuple); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java new file mode 100644 index 0000000..0de9070 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.formatter; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.converter.Converter; + +/** + * Abstract class that implements Converter interface. This is a schema enabled + * Formatter <br> + * Sub classes need to implement the convert method <br> + * <b>Port Interface</b><br> + * <b>in</b>: expects <Object> this is a schema enabled port<br> + * <b>out</b>: emits <OUTPUT> <br> + * <b>err</b>: emits <Object> error port that emits input tuple that could + * not be converted<br> + * <br> + * + * @displayName Parser + * @tags parser converter + * @param <INPUT> + * @since 3.2.0 + */ [email protected] +public abstract class Formatter<OUTPUT> extends BaseOperator implements Converter<Object, OUTPUT>, + ActivationListener<Context> +{ + protected transient Class<?> clazz; + + @OutputPortFieldAnnotation + public transient DefaultOutputPort<OUTPUT> out = new DefaultOutputPort<OUTPUT>(); + + @OutputPortFieldAnnotation(optional = true) + public transient DefaultOutputPort<Object> err = new DefaultOutputPort<Object>(); + + @InputPortFieldAnnotation(schemaRequired = true) + public transient DefaultInputPort<Object> in = new DefaultInputPort<Object>() + { + public void setup(PortContext context) + { + clazz = context.getValue(Context.PortContext.TUPLE_CLASS); + } + + @Override + public void process(Object inputTuple) + { + OUTPUT tuple = convert(inputTuple); + if (tuple == null && err.isConnected()) { + err.emit(inputTuple); + return; + } + if (out.isConnected()) { + out.emit(tuple); + } + } + }; + + /** + * Get the class that needs to be formatted + * + * @return Class<?> + */ + public Class<?> getClazz() + { + return clazz; + } + + /** + * Set the class of tuple that needs to be formatted + * + * @param clazz + */ + public void setClazz(Class<?> clazz) + { + this.clazz = clazz; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java b/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java new file mode 100644 index 0000000..627bf95 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.formatter; + +import java.io.IOException; +import java.text.SimpleDateFormat; + +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectWriter; +import org.codehaus.jackson.map.SerializationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Operator that converts POJO to JSON string <br> + * <b>Properties</b> <br> + * <b>dateFormat</b>: date format e.g dd/MM/yyyy + * + * @displayName JsonFormatter + * @category Formatter + * @tags pojo json formatter + * @since 3.2.0 + */ [email protected] +public class JsonFormatter extends Formatter<String> +{ + private transient ObjectWriter writer; + protected String dateFormat; + + @Override + public void activate(Context context) + { + try { + ObjectMapper mapper = new ObjectMapper(); + if (dateFormat != null) { + mapper.setDateFormat(new SimpleDateFormat(dateFormat)); + } + writer = mapper.writerWithType(clazz); + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true); + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, true); + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_IS_GETTERS, true); + } catch (Throwable e) { + throw new RuntimeException("Unable find provided class"); + } + } + + @Override + public void deactivate() + { + + } + + @Override + public String convert(Object tuple) + { + try { + return writer.writeValueAsString(tuple); + } catch (JsonGenerationException | JsonMappingException e) { + logger.debug("Error while converting tuple {} {}",tuple,e.getMessage()); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + return null; + } + + /** + * Get the date format + * + * @return Date format string + */ + public String getDateFormat() + { + return dateFormat; + } + + /** + * Set the date format + * + * @param dateFormat + */ + public void setDateFormat(String dateFormat) + { + this.dateFormat = dateFormat; + } + + private static final Logger logger = LoggerFactory.getLogger(JsonFormatter.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java b/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java new file mode 100644 index 0000000..35ee7b7 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.formatter; + +import java.io.Writer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.XStreamException; +import com.thoughtworks.xstream.converters.basic.DateConverter; +import com.thoughtworks.xstream.io.HierarchicalStreamWriter; +import com.thoughtworks.xstream.io.xml.CompactWriter; +import com.thoughtworks.xstream.io.xml.XppDriver; + +import com.datatorrent.api.Context; + +/** + * @displayName XmlParser + * @category Formatter + * @tags xml pojo formatter + * @since 3.2.0 + */ [email protected] +public class XmlFormatter extends Formatter<String> +{ + + private transient XStream xstream; + + protected String alias; + protected String dateFormat; + protected boolean prettyPrint; + + public XmlFormatter() + { + alias = null; + dateFormat = null; + } + + @Override + public void activate(Context context) + { + if (prettyPrint) { + xstream = new XStream(); + } else { + xstream = new XStream(new XppDriver() + { + @Override + public HierarchicalStreamWriter createWriter(Writer out) + { + return new CompactWriter(out, getNameCoder()); + } + }); + } + if (alias != null) { + try { + xstream.alias(alias, clazz); + } catch (Throwable e) { + throw new RuntimeException("Unable find provided class"); + } + } + if (dateFormat != null) { + xstream.registerConverter(new DateConverter(dateFormat, new String[] {})); + } + } + + @Override + public void deactivate() + { + + } + + @Override + public String convert(Object tuple) + { + try { + return xstream.toXML(tuple); + } catch (XStreamException e) { + logger.debug("Error while converting tuple {} {} ",tuple,e.getMessage()); + return null; + } + } + + /** + * Gets the alias This is an optional step. Without it XStream would work + * fine, but the XML element names would contain the fully qualified name of + * each class (including package) which would bulk up the XML a bit. + * + * @return alias. + */ + public String getAlias() + { + return alias; + } + + /** + * Sets the alias This is an optional step. Without it XStream would work + * fine, but the XML element names would contain the fully qualified name of + * each class (including package) which would bulk up the XML a bit. + * + * @param alias + * . + */ + public void setAlias(String alias) + { + this.alias = alias; + } + + /** + * Gets the date format e.g dd/mm/yyyy - this will be how a date would be + * formatted + * + * @return dateFormat. + */ + public String getDateFormat() + { + return dateFormat; + } + + /** + * Sets the date format e.g dd/mm/yyyy - this will be how a date would be + * formatted + * + * @param dateFormat + * . + */ + public void setDateFormat(String dateFormat) + { + this.dateFormat = dateFormat; + } + + /** + * Returns true if pretty print is enabled. + * + * @return prettyPrint + */ + public boolean isPrettyPrint() + { + return prettyPrint; + } + + /** + * Sets pretty print option. + * + * @param prettyPrint + */ + public void setPrettyPrint(boolean prettyPrint) + { + this.prettyPrint = prettyPrint; + } + + private static final Logger logger = LoggerFactory.getLogger(XmlFormatter.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java new file mode 100644 index 0000000..3727d86 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.parser; + +import java.io.IOException; +import java.text.SimpleDateFormat; + +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Operator that converts JSON string to Pojo <br> + * <b>Properties</b> <br> + * <b>dateFormat</b>: date format e.g dd/MM/yyyy + * + * @displayName JsonParser + * @category Parsers + * @tags json pojo parser + * @since 3.2.0 + */ [email protected] +public class JsonParser extends Parser<String> +{ + + private transient ObjectReader reader; + protected String dateFormat; + + @Override + public void activate(Context context) + { + try { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + if (dateFormat != null) { + mapper.setDateFormat(new SimpleDateFormat(dateFormat)); + } + reader = mapper.reader(clazz); + } catch (Throwable e) { + throw new RuntimeException("Unable find provided class"); + } + } + + @Override + public void deactivate() + { + } + + @Override + public Object convert(String tuple) + { + try { + if (!StringUtils.isEmpty(tuple)) { + return reader.readValue(tuple); + } + } catch (JsonProcessingException e) { + logger.debug("Error while converting tuple {} {}", tuple, e.getMessage()); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + return null; + } + + /** + * Get the date format + * + * @return Date format string + */ + public String getDateFormat() + { + return dateFormat; + } + + /** + * Set the date format + * + * @param dateFormat + */ + public void setDateFormat(String dateFormat) + { + this.dateFormat = dateFormat; + } + + private static final Logger logger = LoggerFactory.getLogger(JsonParser.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/main/java/com/datatorrent/lib/parser/Parser.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/parser/Parser.java b/library/src/main/java/com/datatorrent/lib/parser/Parser.java new file mode 100644 index 0000000..c9455e2 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/parser/Parser.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.parser; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.converter.Converter; + +/** + * Abstract class that implements Converter interface. This is a schema enabled + * Parser <br> + * Sub classes need to implement the convert method <br> + * <br> + * <b>Port Interface</b><br> + * <b>in</b>: expects <INPUT><br> + * <b>out</b>: emits <Object> this is a schema enabled port<br> + * <b>err</b>: emits <INPUT> error port that emits input tuple that could + * not be converted<br> + * <br> + * + * @displayName Parser + * @tags parser converter + * @param <INPUT> + * @since 3.2.0 + */ [email protected] +public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>, + ActivationListener<Context> +{ + protected transient Class<?> clazz; + + @OutputPortFieldAnnotation(schemaRequired = true) + public transient DefaultOutputPort<Object> out = new DefaultOutputPort<Object>() + { + public void setup(PortContext context) + { + clazz = context.getValue(Context.PortContext.TUPLE_CLASS); + } + }; + + @OutputPortFieldAnnotation(optional = true) + public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>(); + + public transient DefaultInputPort<INPUT> in = new DefaultInputPort<INPUT>() + { + @Override + public void process(INPUT inputTuple) + { + Object tuple = convert(inputTuple); + if (tuple == null && err.isConnected()) { + err.emit(inputTuple); + return; + } + if (out.isConnected()) { + out.emit(tuple); + } + } + }; + + /** + * Get the class that needs to be formatted + * + * @return Class<?> + */ + public Class<?> getClazz() + { + return clazz; + } + + /** + * Set the class of tuple that needs to be formatted + * + * @param clazz + */ + public void setClazz(Class<?> clazz) + { + this.clazz = clazz; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java new file mode 100644 index 0000000..888837d --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.parser; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.XStreamException; +import com.thoughtworks.xstream.converters.basic.DateConverter; + +import com.datatorrent.api.Context; + +/** + * Operator that converts XML string to Pojo <br> + * <b>Properties</b> <br> + * <b>alias</b>:This maps to the root element of the XML string. If not + * specified, parser would expect the root element to be fully qualified name of + * the Pojo Class. <br> + * <b>dateFormats</b>: Comma separated string of date formats e.g + * dd/mm/yyyy,dd-mmm-yyyy where first one would be considered default + * + * @displayName XmlParser + * @category Parsers + * @tags xml pojo parser + * @since 3.2.0 + */ [email protected] +public class XmlParser extends Parser<String> +{ + + private transient XStream xstream; + protected String alias; + protected String dateFormats; + + public XmlParser() + { + alias = null; + dateFormats = null; + } + + @Override + public void activate(Context context) + { + xstream = new XStream(); + if (alias != null) { + try { + xstream.alias(alias, clazz); + } catch (Throwable e) { + throw new RuntimeException("Unable find provided class"); + } + } + if (dateFormats != null) { + String[] dateFormat = dateFormats.split(","); + xstream.registerConverter(new DateConverter(dateFormat[0], dateFormat)); + } + } + + @Override + public void deactivate() + { + + } + + @Override + public Object convert(String tuple) + { + try { + return xstream.fromXML(tuple); + } catch (XStreamException e) { + logger.debug("Error while converting tuple {} {}", tuple,e.getMessage()); + return null; + } + } + + /** + * Gets the alias + * + * @return alias. + */ + public String getAlias() + { + return alias; + } + + /** + * Sets the alias This maps to the root element of the XML string. If not + * specified, parser would expect the root element to be fully qualified name + * of the Pojo Class. + * + * @param alias + * . + */ + public void setAlias(String alias) + { + this.alias = alias; + } + + /** + * Gets the comma separated string of date formats e.g dd/mm/yyyy,dd-mmm-yyyy + * where first one would be considered default + * + * @return dateFormats. + */ + public String getDateFormats() + { + return dateFormats; + } + + /** + * Sets the comma separated string of date formats e.g dd/mm/yyyy,dd-mmm-yyyy + * where first one would be considered default + * + * @param dateFormats + * . + */ + public void setDateFormats(String dateFormats) + { + this.dateFormats = dateFormats; + } + + private static final Logger logger = LoggerFactory.getLogger(XmlParser.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java new file mode 100644 index 0000000..bde544e --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.formatter; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.Date; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.Description; + +import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; +import com.datatorrent.lib.util.TestUtils.TestInfo; +import com.google.common.collect.Lists; + +public class JsonFormatterTest +{ + JsonFormatter operator; + CollectorTestSink<Object> validDataSink; + CollectorTestSink<String> invalidDataSink; + + final ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + + public JsonFormatterTest() + { + // So that the output is cleaner. + System.setErr(new PrintStream(myOut)); + } + + @Rule + public TestInfo testMeta = new FSTestWatcher() + { + private void deleteDirectory() + { + try { + FileUtils.deleteDirectory(new File(getDir())); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + protected void starting(Description descriptor) + { + super.starting(descriptor); + deleteDirectory(); + + operator = new JsonFormatter(); + + validDataSink = new CollectorTestSink<Object>(); + invalidDataSink = new CollectorTestSink<String>(); + TestUtils.setSink(operator.out, validDataSink); + TestUtils.setSink(operator.err, invalidDataSink); + operator.setup(null); + operator.activate(null); + + operator.beginWindow(0); + } + + @Override + protected void finished(Description description) + { + operator.endWindow(); + operator.teardown(); + + deleteDirectory(); + super.finished(description); + } + }; + + @Test + public void testJSONToPOJO() + { + Test1Pojo pojo = new Test1Pojo(); + pojo.a = 123; + pojo.b = 234876274; + pojo.c = "HowAreYou?"; + pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ"); + + operator.in.put(pojo); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":null}"; + Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJODate() + { + Test1Pojo pojo = new Test1Pojo(); + pojo.a = 123; + pojo.b = 234876274; + pojo.c = "HowAreYou?"; + pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ"); + pojo.date = new DateTime().withYear(2015).withMonthOfYear(9).withDayOfMonth(15).toDate(); + operator.setDateFormat("dd-MM-yyyy"); + operator.setup(null); + operator.activate(null); + operator.in.put(pojo); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}"; + Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJONullFields() + { + Test1Pojo pojo = new Test1Pojo(); + pojo.a = 123; + pojo.b = 234876274; + pojo.c = "HowAreYou?"; + pojo.d = null; + + operator.in.put(pojo); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":null,\"date\":null}"; + Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJOEmptyPOJO() + { + Test1Pojo pojo = new Test1Pojo(); + operator.in.put(pojo); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expectedJSONString = "{\"a\":0,\"b\":0,\"c\":null,\"d\":null,\"date\":null}"; + System.out.println(validDataSink.collectedTuples.get(0)); + Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJONullPOJO() + { + operator.in.put(null); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expectedJSONString = "null"; + Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJONoFieldPOJO() + { + operator.endWindow(); + operator.teardown(); + operator.setClazz(Test2Pojo.class); + operator.setup(null); + operator.beginWindow(1); + + Test2Pojo o = new Test2Pojo(); + operator.in.put(o); + Assert.assertEquals(0, validDataSink.collectedTuples.size()); + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); + Assert.assertEquals(o, invalidDataSink.collectedTuples.get(0)); + } + + public static class Test1Pojo + { + public int a; + public long b; + public String c; + public List<String> d; + public Date date; + + @Override + public String toString() + { + return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]"; + } + } + + public static class Test2Pojo + { + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java new file mode 100644 index 0000000..237ae5a --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.formatter; + +import java.util.Date; + +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class XmlFormatterTest +{ + + XmlFormatter operator; + CollectorTestSink<Object> validDataSink; + CollectorTestSink<String> invalidDataSink; + + @Rule + public Watcher watcher = new Watcher(); + + public class Watcher extends TestWatcher + { + + @Override + protected void starting(Description description) + { + super.starting(description); + operator = new XmlFormatter(); + operator.setClazz(EmployeeBean.class); + operator.setDateFormat("yyyy-MM-dd"); + validDataSink = new CollectorTestSink<Object>(); + invalidDataSink = new CollectorTestSink<String>(); + TestUtils.setSink(operator.out, validDataSink); + TestUtils.setSink(operator.err, invalidDataSink); + } + + @Override + protected void finished(Description description) + { + super.finished(description); + operator.teardown(); + } + + } + + @Test + public void testPojoToXmlWithoutAlias() + { + EmployeeBean e = new EmployeeBean(); + e.setName("john"); + e.setEid(1); + e.setDept("cs"); + e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); + + operator.setup(null); + operator.activate(null); + operator.in.process(e); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expected = "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" + "<name>john</name>" + + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + + "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>"; + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testXmlToPojoWithAlias() + { + EmployeeBean e = new EmployeeBean(); + e.setName("john"); + e.setEid(1); + e.setDept("cs"); + e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); + + operator.setAlias("EmployeeBean"); + operator.setup(null); + operator.activate(null); + operator.in.process(e); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expected = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>" + + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>"; + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testXmlToPojoWithPrettyPrint() + { + EmployeeBean e = new EmployeeBean(); + e.setName("john"); + e.setEid(1); + e.setDept("cs"); + e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); + + operator.setAlias("EmployeeBean"); + operator.setPrettyPrint(true); + operator.setup(null); + operator.activate(null); + operator.in.process(e); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expected = "<EmployeeBean>\n" + " <name>john</name>\n" + " <dept>cs</dept>\n" + " <eid>1</eid>\n" + + " <dateOfJoining>2015-01-01</dateOfJoining>\n" + "</EmployeeBean>"; + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); + } + + @Test + public void testPojoToXmlWithoutAliasHeirarchical() + { + EmployeeBean e = new EmployeeBean(); + e.setName("john"); + e.setEid(1); + e.setDept("cs"); + e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate()); + Address address = new Address(); + address.setCity("new york"); + address.setCountry("US"); + e.setAddress(address); + + operator.setup(null); + operator.activate(null); + operator.in.process(e); + System.out.println(validDataSink.collectedTuples.get(0)); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + String expected = "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" + "<name>john</name>" + + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>" + + "<city>new york</city>" + "<country>US</country>" + "</address>" + + "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>"; + Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); + } + + public static class EmployeeBean + { + + private String name; + private String dept; + private int eid; + private Date dateOfJoining; + private Address address; + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public String getDept() + { + return dept; + } + + public void setDept(String dept) + { + this.dept = dept; + } + + public int getEid() + { + return eid; + } + + public void setEid(int eid) + { + this.eid = eid; + } + + public Date getDateOfJoining() + { + return dateOfJoining; + } + + public void setDateOfJoining(Date dateOfJoining) + { + this.dateOfJoining = dateOfJoining; + } + + public Address getAddress() + { + return address; + } + + public void setAddress(Address address) + { + this.address = address; + } + } + + public static class Address + { + + private String city; + private String country; + + public String getCity() + { + return city; + } + + public void setCity(String city) + { + this.city = city; + } + + public String getCountry() + { + return country; + } + + public void setCountry(String country) + { + this.country = country; + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java b/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java new file mode 100644 index 0000000..d091267 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.parser; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.Date; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.Description; + +import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; +import com.datatorrent.lib.util.TestUtils.TestInfo; + +public class JsonParserTest +{ + JsonParser operator; + CollectorTestSink<Object> validDataSink; + CollectorTestSink<String> invalidDataSink; + + final ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + + public JsonParserTest() + { + // So that the output is cleaner. + System.setErr(new PrintStream(myOut)); + } + + @Rule + public TestInfo testMeta = new FSTestWatcher() + { + private void deleteDirectory() + { + try { + FileUtils.deleteDirectory(new File(getDir())); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + protected void starting(Description descriptor) + { + + super.starting(descriptor); + deleteDirectory(); + + operator = new JsonParser(); + operator.setClazz(Test1Pojo.class); + validDataSink = new CollectorTestSink<Object>(); + invalidDataSink = new CollectorTestSink<String>(); + TestUtils.setSink(operator.out, validDataSink); + TestUtils.setSink(operator.err, invalidDataSink); + operator.setup(null); + operator.activate(null); + + operator.beginWindow(0); + } + + @Override + protected void finished(Description description) + { + operator.endWindow(); + operator.teardown(); + + deleteDirectory(); + super.finished(description); + } + }; + + @Test + public void testJSONToPOJO() + { + String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; + operator.in.put(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(Test1Pojo.class, obj.getClass()); + Test1Pojo pojo = (Test1Pojo)obj; + Assert.assertEquals(123, pojo.a); + Assert.assertEquals(234876274, pojo.b); + Assert.assertEquals("HowAreYou?", pojo.c); + Assert.assertEquals(3, pojo.d.size()); + Assert.assertEquals("ABC", pojo.d.get(0)); + Assert.assertEquals("PQR", pojo.d.get(1)); + Assert.assertEquals("XYZ", pojo.d.get(2)); + } + + @Test + public void testJSONToPOJODate() + { + String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}"; + operator.setDateFormat("dd-MM-yyyy"); + operator.setup(null); + operator.activate(null); + operator.in.put(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(Test1Pojo.class, obj.getClass()); + Test1Pojo pojo = (Test1Pojo)obj; + Assert.assertEquals(123, pojo.a); + Assert.assertEquals(234876274, pojo.b); + Assert.assertEquals("HowAreYou?", pojo.c); + Assert.assertEquals(3, pojo.d.size()); + Assert.assertEquals("ABC", pojo.d.get(0)); + Assert.assertEquals("PQR", pojo.d.get(1)); + Assert.assertEquals("XYZ", pojo.d.get(2)); + Assert.assertEquals(2015, new DateTime(pojo.date).getYear()); + Assert.assertEquals(9, new DateTime(pojo.date).getMonthOfYear()); + Assert.assertEquals(15, new DateTime(pojo.date).getDayOfMonth()); + } + + @Test + public void testJSONToPOJOInvalidData() + { + String tuple = "{\"a\":123\"b\":234876274,\"c\":\"HowAreYou?\"}"; + operator.in.put(tuple); + Assert.assertEquals(0, validDataSink.collectedTuples.size()); + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJOUnknownFields() + { + String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"asd\":433.6}"; + operator.in.put(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(Test1Pojo.class, obj.getClass()); + Test1Pojo pojo = (Test1Pojo)obj; + Assert.assertEquals(123, pojo.a); + Assert.assertEquals(234876274, pojo.b); + Assert.assertEquals("HowAreYou?", pojo.c); + Assert.assertEquals(null, pojo.d); + } + + @Test + public void testJSONToPOJOMismatchingFields() + { + String tuple = "{\"a\":123,\"c\":234876274,\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; + operator.in.put(tuple); + Assert.assertEquals(0, validDataSink.collectedTuples.size()); + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJOEmptyString() + { + String tuple = ""; + operator.in.put(tuple); + Assert.assertEquals(0, validDataSink.collectedTuples.size()); + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); + } + + @Test + public void testJSONToPOJOEmptyJSON() + { + String tuple = "{}"; + operator.in.put(tuple); + Assert.assertEquals(1, validDataSink.collectedTuples.size()); + Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); + Object obj = validDataSink.collectedTuples.get(0); + Assert.assertNotNull(obj); + Assert.assertEquals(Test1Pojo.class, obj.getClass()); + Test1Pojo pojo = (Test1Pojo)obj; + Assert.assertEquals(0, pojo.a); + Assert.assertEquals(0, pojo.b); + Assert.assertEquals(null, pojo.c); + Assert.assertEquals(null, pojo.d); + } + + @Test + public void testJSONToPOJOArrayInJson() + { + String tuple = "{\"a\":123,\"c\":[234,65,23],\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}"; + operator.in.put(tuple); + Assert.assertEquals(0, validDataSink.collectedTuples.size()); + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); + Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0)); + } + + public static class Test1Pojo + { + public int a; + public long b; + public String c; + public List<String> d; + public Date date; + + @Override + public String toString() + { + return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]"; + } + } +}
