Repository: apex-malhar Updated Branches: refs/heads/master ef8e64ffe -> eaa3bf3b9
APEXMALHAR-2424 Extra null field getting added to columnDataTypes Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8d55cb2a Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8d55cb2a Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8d55cb2a Branch: refs/heads/master Commit: 8d55cb2a66e1ab37a3dccd00c0f6baf422ba1902 Parents: ec7b480 Author: Hitesh-Scorpio <[email protected]> Authored: Fri Feb 24 15:10:06 2017 +0530 Committer: Hitesh-Scorpio <[email protected]> Committed: Mon Feb 27 13:48:55 2017 +0530 ---------------------------------------------------------------------- .../lib/db/jdbc/JdbcPOJOPollInputOperator.java | 6 +- .../db/jdbc/JdbcPojoPollableOpeartorTest.java | 90 ++++++++++++++++++++ 2 files changed, 94 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8d55cb2a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java index 129981a..f96c6e1 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java @@ -201,7 +201,9 @@ public class JdbcPOJOPollInputOperator extends AbstractJdbcPollInputOperator<Obj } for (FieldInfo fieldInfo : fieldInfos) { - columnDataTypes.add(nameToType.get(fieldInfo.getColumnName().toUpperCase())); + if (nameToType.containsKey(fieldInfo.getColumnName().toUpperCase())) { + columnDataTypes.add(nameToType.get(fieldInfo.getColumnName().toUpperCase())); + } } } } @@ -219,7 +221,7 @@ public class JdbcPOJOPollInputOperator extends AbstractJdbcPollInputOperator<Obj } try { - for (int i = 0; i < fieldInfos.size(); i++) { + for (int i = 0; i < columnDataTypes.size(); i++) { int type = columnDataTypes.get(i); ActiveFieldInfo afi = columnFieldSetters.get(i); switch (type) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8d55cb2a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java index 0f9a7c9..d90f2c1 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java @@ -227,6 +227,96 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest } + @Test + public void testDBPollerExtraField() throws InterruptedException + { + insertEvents(10, true, 0); + + JdbcStore store = new JdbcStore(); + store.setDatabaseDriver(DB_DRIVER); + store.setDatabaseUrl(URL); + + List<FieldInfo> fieldInfos = getFieldInfos(); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + + JdbcPOJOPollInputOperator inputOperator = new JdbcPOJOPollInputOperator(); + inputOperator.setStore(store); + inputOperator.setTableName(TABLE_POJO_NAME); + inputOperator.setColumnsExpression("ID,STARTDATE,STARTTIME,STARTTIMESTAMP"); + inputOperator.setKey("id"); + inputOperator.setFieldInfos(fieldInfos); + inputOperator.setFetchSize(100); + inputOperator.setBatchSize(100); + inputOperator.setPartitionCount(2); + + Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> newPartitions = inputOperator + .definePartitions(new ArrayList<Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>(), null); + + int operatorId = 0; + for (com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>> partition : newPartitions) { + + Attribute.AttributeMap.DefaultAttributeMap partitionAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + partitionAttributeMap.put(DAG.APPLICATION_ID, APP_ID); + partitionAttributeMap.put(Context.DAGContext.APPLICATION_PATH, dir); + + OperatorContextTestHelper.TestIdOperatorContext partitioningContext = new OperatorContextTestHelper.TestIdOperatorContext( + operatorId++, partitionAttributeMap); + + JdbcPOJOPollInputOperator parition = (JdbcPOJOPollInputOperator)partition.getPartitionedInstance(); + parition.outputPort.setup(tpc); + parition.setScheduledExecutorService(mockscheduler); + parition.setup(partitioningContext); + parition.activate(partitioningContext); + } + + Iterator<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> itr = newPartitions + .iterator(); + // First partition is for range queries,last is for polling queries + JdbcPOJOPollInputOperator firstInstance = (JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance(); + CollectorTestSink<Object> sink1 = new CollectorTestSink<>(); + firstInstance.outputPort.setSink(sink1); + firstInstance.beginWindow(0); + firstInstance.pollRecords(); + firstInstance.pollRecords(); + firstInstance.emitTuples(); + firstInstance.endWindow(); + + Assert.assertEquals("rows from db", 5, sink1.collectedTuples.size()); + for (Object tuple : sink1.collectedTuples) { + TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple; + Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date); + Assert.assertTrue("date", pojoEvent.getId() < 5); + } + + JdbcPOJOPollInputOperator secondInstance = (JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance(); + CollectorTestSink<Object> sink2 = new CollectorTestSink<>(); + secondInstance.outputPort.setSink(sink2); + secondInstance.beginWindow(0); + secondInstance.pollRecords(); + secondInstance.emitTuples(); + secondInstance.endWindow(); + + Assert.assertEquals("rows from db", 5, sink2.collectedTuples.size()); + for (Object tuple : sink2.collectedTuples) { + TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple; + Assert.assertTrue("date", pojoEvent.getId() < 10 && pojoEvent.getId() >= 5); + } + + insertEvents(4, false, 10); + JdbcPOJOPollInputOperator thirdInstance = (JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance(); + CollectorTestSink<Object> sink3 = new CollectorTestSink<>(); + thirdInstance.outputPort.setSink(sink3); + thirdInstance.beginWindow(0); + thirdInstance.pollRecords(); + thirdInstance.emitTuples(); + thirdInstance.endWindow(); + + Assert.assertEquals("rows from db", 4, sink3.collectedTuples.size()); + } + private List<FieldInfo> getFieldInfos() { List<FieldInfo> fieldInfos = Lists.newArrayList();
