Repository: apex-malhar Updated Branches: refs/heads/master d06b2d987 -> a185fef04
APEXMALHAR-2157 Json formatter improvements Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9e9fe76f Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9e9fe76f Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9e9fe76f Branch: refs/heads/master Commit: 9e9fe76fefb8f5436dfd9998181802d42e98cad6 Parents: aaa4464 Author: shubham <[email protected]> Authored: Mon Jul 18 14:46:46 2016 +0530 Committer: shubham <[email protected]> Committed: Mon Jul 25 17:04:05 2016 +0530 ---------------------------------------------------------------------- library/pom.xml | 5 + .../datatorrent/lib/formatter/Formatter.java | 40 +++++ .../lib/formatter/JsonFormatter.java | 62 ++----- .../lib/formatter/JsonFormatterTest.java | 162 +++++++++++++------ 4 files changed, 167 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9e9fe76f/library/pom.xml ---------------------------------------------------------------------- diff --git a/library/pom.xml b/library/pom.xml index e9f64c8..dae1a38 100644 --- a/library/pom.xml +++ b/library/pom.xml @@ -351,6 +351,11 @@ <version>1.10.73</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.5.4</version> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9e9fe76f/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java index 25c0b96..db8dbc4 100644 --- a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java +++ b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java @@ -18,6 +18,9 @@ */ package com.datatorrent.lib.formatter; +import com.google.common.annotations.VisibleForTesting; + +import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context; import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DefaultInputPort; @@ -48,6 +51,13 @@ public abstract class Formatter<OUTPUT> extends BaseOperator implements Converte { protected transient Class<?> clazz; + @AutoMetric + private long errorTupleCount; + @AutoMetric + private long emittedObjectCount; + @AutoMetric + private long incomingTuplesCount; + @OutputPortFieldAnnotation public transient DefaultOutputPort<OUTPUT> out = new DefaultOutputPort<OUTPUT>(); @@ -65,17 +75,28 @@ public abstract class Formatter<OUTPUT> extends BaseOperator implements Converte @Override public void process(Object inputTuple) { + incomingTuplesCount++; OUTPUT tuple = convert(inputTuple); if (tuple == null && err.isConnected()) { + errorTupleCount++; err.emit(inputTuple); return; } if (out.isConnected()) { + emittedObjectCount++; out.emit(tuple); } } }; + @Override + public void beginWindow(long windowId) + { + errorTupleCount = 0; + emittedObjectCount = 0; + incomingTuplesCount = 0; + } + /** * Get the class that needs to be formatted * @@ -95,4 +116,23 @@ public abstract class Formatter<OUTPUT> extends BaseOperator implements Converte { this.clazz = clazz; } + + @VisibleForTesting + protected long getErrorTupleCount() + { + return errorTupleCount; + } + + @VisibleForTesting + protected long getEmittedObjectCount() + { + return emittedObjectCount; + } + + @VisibleForTesting + protected long getIncomingTuplesCount() + { + return incomingTuplesCount; + } + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9e9fe76f/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java b/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java index fa17fda..840b550 100644 --- a/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java +++ b/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java @@ -16,26 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package com.datatorrent.lib.formatter; -import java.io.IOException; -import java.text.SimpleDateFormat; +package com.datatorrent.lib.formatter; -import org.codehaus.jackson.JsonGenerationException; -import org.codehaus.jackson.map.JsonMappingException; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.ObjectWriter; -import org.codehaus.jackson.map.SerializationConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.netlet.util.DTThrowable; /** * Operator that converts POJO to JSON string <br> - * <b>Properties</b> <br> - * <b>dateFormat</b>: date format e.g dd/MM/yyyy * * @displayName JsonFormatter * @category Formatter @@ -45,58 +38,27 @@ import com.datatorrent.netlet.util.DTThrowable; @org.apache.hadoop.classification.InterfaceStability.Evolving public class JsonFormatter extends Formatter<String> { - private transient ObjectWriter writer; - protected String dateFormat; + private transient ObjectMapper objMapper; @Override public void setup(OperatorContext context) { - try { - ObjectMapper mapper = new ObjectMapper(); - if (dateFormat != null) { - mapper.setDateFormat(new SimpleDateFormat(dateFormat)); - } - writer = mapper.writerWithType(clazz); - mapper.configure(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true); - mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, true); - mapper.configure(SerializationConfig.Feature.AUTO_DETECT_IS_GETTERS, true); - } catch (Throwable e) { - throw new RuntimeException("Unable find provided class"); - } + objMapper = new ObjectMapper(); } @Override public String convert(Object tuple) { + if (tuple == null) { + return null; + } try { - return writer.writeValueAsString(tuple); - } catch (JsonGenerationException | JsonMappingException e) { - logger.debug("Error while converting tuple {} {}",tuple,e.getMessage()); - } catch (IOException e) { - DTThrowable.rethrow(e); + return objMapper.writeValueAsString(tuple); + } catch (JsonProcessingException e) { + logger.error("Error while converting tuple {} {}", tuple, e); } return null; } - /** - * Get the date format - * - * @return Date format string - */ - public String getDateFormat() - { - return dateFormat; - } - - /** - * Set the date format - * - * @param dateFormat - */ - public void setDateFormat(String dateFormat) - { - this.dateFormat = dateFormat; - } - private static final Logger logger = LoggerFactory.getLogger(JsonFormatter.class); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9e9fe76f/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java index 98be88c..126639c 100644 --- a/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java +++ b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java @@ -22,21 +22,34 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.PrintStream; +import java.util.ArrayList; import java.util.Date; import java.util.List; +import javax.validation.ConstraintViolationException; + import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.runner.Description; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.TestUtils; @@ -99,73 +112,69 @@ public class JsonFormatterTest @Test public void testJSONToPOJO() { - Test1Pojo pojo = new Test1Pojo(); - pojo.a = 123; - pojo.b = 234876274; - pojo.c = "HowAreYou?"; - pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ"); - + Ad pojo = new Ad(); + pojo.adId = 123; + pojo.campaignId = 234876274; + pojo.description = "sports"; + pojo.sizes = Lists.newArrayList("200x350", "600x800"); + pojo.startDate = new DateTime().withDate(2016, 1, 1).withMillisOfDay(0).withZoneRetainFields(DateTimeZone.UTC) + .toDate(); + pojo.endDate = new DateTime().withDate(2016, 2, 1).withMillisOfDay(0).withZoneRetainFields(DateTimeZone.UTC) + .toDate(); operator.in.put(pojo); Assert.assertEquals(1, validDataSink.collectedTuples.size()); Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":null}"; - Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); - } - - @Test - public void testJSONToPOJODate() - { - Test1Pojo pojo = new Test1Pojo(); - pojo.a = 123; - pojo.b = 234876274; - pojo.c = "HowAreYou?"; - pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ"); - pojo.date = new DateTime().withYear(2015).withMonthOfYear(9).withDayOfMonth(15).toDate(); - operator.setDateFormat("dd-MM-yyyy"); - operator.setup(null); - operator.in.put(pojo); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}"; + String expectedJSONString = "{\"adId\":123,\"campaignId\":234876274,\"sizes\":[\"200x350\",\"600x800\"],\"startDate\":\"Fri, 1 Jan 2016 00:00:00\",\"endDate\":\"01-Feb-2016\",\"desc\":\"sports\"}"; Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); + Assert.assertEquals(1, operator.getIncomingTuplesCount()); + Assert.assertEquals(1, operator.getEmittedObjectCount()); + Assert.assertEquals(0, operator.getErrorTupleCount()); } @Test public void testJSONToPOJONullFields() { - Test1Pojo pojo = new Test1Pojo(); - pojo.a = 123; - pojo.b = 234876274; - pojo.c = "HowAreYou?"; - pojo.d = null; + Ad pojo = new Ad(); + pojo.adId = 123; + pojo.campaignId = 234876274; + pojo.description = "sports"; + pojo.sizes = null; + pojo.startDate = null; + pojo.endDate = null; operator.in.put(pojo); Assert.assertEquals(1, validDataSink.collectedTuples.size()); Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":null,\"date\":null}"; + String expectedJSONString = "{\"adId\":123,\"campaignId\":234876274,\"sizes\":null,\"startDate\":null,\"endDate\":null,\"desc\":\"sports\"}"; Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); + Assert.assertEquals(1, operator.getIncomingTuplesCount()); + Assert.assertEquals(1, operator.getEmittedObjectCount()); + Assert.assertEquals(0, operator.getErrorTupleCount()); } @Test public void testJSONToPOJOEmptyPOJO() { - Test1Pojo pojo = new Test1Pojo(); + Ad pojo = new Ad(); operator.in.put(pojo); Assert.assertEquals(1, validDataSink.collectedTuples.size()); Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String expectedJSONString = "{\"a\":0,\"b\":0,\"c\":null,\"d\":null,\"date\":null}"; - LOG.debug("{}", validDataSink.collectedTuples.get(0)); + String expectedJSONString = "{\"adId\":0,\"campaignId\":0,\"sizes\":null,\"startDate\":null,\"endDate\":null,\"desc\":null}"; Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); + Assert.assertEquals(1, operator.getIncomingTuplesCount()); + Assert.assertEquals(1, operator.getEmittedObjectCount()); + Assert.assertEquals(0, operator.getErrorTupleCount()); } @Test public void testJSONToPOJONullPOJO() { operator.in.put(null); - Assert.assertEquals(1, validDataSink.collectedTuples.size()); - Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); - String expectedJSONString = "null"; - Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); + Assert.assertEquals(0, validDataSink.collectedTuples.size()); + Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); + Assert.assertEquals(1, operator.getIncomingTuplesCount()); + Assert.assertEquals(0, operator.getEmittedObjectCount()); + Assert.assertEquals(1, operator.getErrorTupleCount()); } @Test @@ -173,36 +182,85 @@ public class JsonFormatterTest { operator.endWindow(); operator.teardown(); - operator.setClazz(Test2Pojo.class); operator.setup(null); operator.beginWindow(1); - Test2Pojo o = new Test2Pojo(); + TestPojo o = new TestPojo(); operator.in.put(o); Assert.assertEquals(0, validDataSink.collectedTuples.size()); Assert.assertEquals(1, invalidDataSink.collectedTuples.size()); Assert.assertEquals(o, invalidDataSink.collectedTuples.get(0)); + Assert.assertEquals(1, operator.getIncomingTuplesCount()); + Assert.assertEquals(0, operator.getEmittedObjectCount()); + Assert.assertEquals(1, operator.getErrorTupleCount()); + } + + @Test + public void testApplication() throws IOException, Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new JsonFormatterApplication(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(2000);// runs for 2 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + public static class JsonFormatterApplication implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + PojoEmitter input = dag.addOperator("data", new PojoEmitter()); + JsonFormatter formatter = dag.addOperator("formatter", new JsonFormatter()); + dag.getMeta(formatter).getMeta(formatter.in).getAttributes().put(Context.PortContext.TUPLE_CLASS, Ad.class); + ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator()); + output.setDebug(true); + dag.addStream("input", input.output, formatter.in); + dag.addStream("output", formatter.out, output.input); + } } - public static class Test1Pojo + public static class PojoEmitter extends BaseOperator implements InputOperator { - public int a; - public long b; - public String c; - public List<String> d; - public Date date; + public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>(); @Override - public String toString() + public void emitTuples() { - return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]"; + Ad test1Pojo = new Ad(); + test1Pojo.adId = 1234; + test1Pojo.campaignId = 2319483L; + test1Pojo.description = "ad"; + test1Pojo.sizes = new ArrayList<String>(); + test1Pojo.sizes.add("250x350"); + test1Pojo.sizes.add("800x600"); + test1Pojo.startDate = new DateTime().withDate(2016, 1, 1).withHourOfDay(0).withMinuteOfHour(0) + .withSecondOfMinute(0).toDate(); + test1Pojo.endDate = new DateTime().withDate(2016, 2, 1).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0) + .withZone(DateTimeZone.UTC).toDate(); + output.emit(test1Pojo); } } - public static class Test2Pojo + public static class Ad { + public int adId; + public long campaignId; + @JsonProperty("desc") + public String description; + public List<String> sizes; + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "EEE, d MMM yyyy HH:mm:ss") + public Date startDate; + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MMM-yyyy") + public Date endDate; } - private static final Logger LOG = LoggerFactory.getLogger(JsonFormatterTest.class); + public static class TestPojo + { + } }
