Repository: apex-malhar
Updated Branches:
  refs/heads/master ef8e64ffe -> eaa3bf3b9


APEXMALHAR-2424 Extra null field getting added to columnDataTypes


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8d55cb2a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8d55cb2a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8d55cb2a

Branch: refs/heads/master
Commit: 8d55cb2a66e1ab37a3dccd00c0f6baf422ba1902
Parents: ec7b480
Author: Hitesh-Scorpio <[email protected]>
Authored: Fri Feb 24 15:10:06 2017 +0530
Committer: Hitesh-Scorpio <[email protected]>
Committed: Mon Feb 27 13:48:55 2017 +0530

----------------------------------------------------------------------
 .../lib/db/jdbc/JdbcPOJOPollInputOperator.java  |  6 +-
 .../db/jdbc/JdbcPojoPollableOpeartorTest.java   | 90 ++++++++++++++++++++
 2 files changed, 94 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8d55cb2a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
index 129981a..f96c6e1 100644
--- 
a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
@@ -201,7 +201,9 @@ public class JdbcPOJOPollInputOperator extends 
AbstractJdbcPollInputOperator<Obj
       }
 
       for (FieldInfo fieldInfo : fieldInfos) {
-        
columnDataTypes.add(nameToType.get(fieldInfo.getColumnName().toUpperCase()));
+        if (nameToType.containsKey(fieldInfo.getColumnName().toUpperCase())) {
+          
columnDataTypes.add(nameToType.get(fieldInfo.getColumnName().toUpperCase()));
+        }
       }
     }
   }
@@ -219,7 +221,7 @@ public class JdbcPOJOPollInputOperator extends 
AbstractJdbcPollInputOperator<Obj
     }
 
     try {
-      for (int i = 0; i < fieldInfos.size(); i++) {
+      for (int i = 0; i < columnDataTypes.size(); i++) {
         int type = columnDataTypes.get(i);
         ActiveFieldInfo afi = columnFieldSetters.get(i);
         switch (type) {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8d55cb2a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
 
b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
index 0f9a7c9..d90f2c1 100644
--- 
a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
+++ 
b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
@@ -227,6 +227,96 @@ public class JdbcPojoPollableOpeartorTest extends 
JdbcOperatorTest
 
   }
 
+  @Test
+  public void testDBPollerExtraField() throws InterruptedException
+  {
+    insertEvents(10, true, 0);
+
+    JdbcStore store = new JdbcStore();
+    store.setDatabaseDriver(DB_DRIVER);
+    store.setDatabaseUrl(URL);
+
+    List<FieldInfo> fieldInfos = getFieldInfos();
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+
+    JdbcPOJOPollInputOperator inputOperator = new JdbcPOJOPollInputOperator();
+    inputOperator.setStore(store);
+    inputOperator.setTableName(TABLE_POJO_NAME);
+    
inputOperator.setColumnsExpression("ID,STARTDATE,STARTTIME,STARTTIMESTAMP");
+    inputOperator.setKey("id");
+    inputOperator.setFieldInfos(fieldInfos);
+    inputOperator.setFetchSize(100);
+    inputOperator.setBatchSize(100);
+    inputOperator.setPartitionCount(2);
+
+    
Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>
 newPartitions = inputOperator
+        .definePartitions(new 
ArrayList<Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>(), 
null);
+
+    int operatorId = 0;
+    for 
(com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>
 partition : newPartitions) {
+
+      Attribute.AttributeMap.DefaultAttributeMap partitionAttributeMap = new 
Attribute.AttributeMap.DefaultAttributeMap();
+      partitionAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
+      partitionAttributeMap.put(Context.DAGContext.APPLICATION_PATH, dir);
+
+      OperatorContextTestHelper.TestIdOperatorContext partitioningContext = 
new OperatorContextTestHelper.TestIdOperatorContext(
+          operatorId++, partitionAttributeMap);
+
+      JdbcPOJOPollInputOperator parition = 
(JdbcPOJOPollInputOperator)partition.getPartitionedInstance();
+      parition.outputPort.setup(tpc);
+      parition.setScheduledExecutorService(mockscheduler);
+      parition.setup(partitioningContext);
+      parition.activate(partitioningContext);
+    }
+
+    
Iterator<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>
 itr = newPartitions
+        .iterator();
+    // First partition is for range queries,last is for polling queries
+    JdbcPOJOPollInputOperator firstInstance = 
(JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance();
+    CollectorTestSink<Object> sink1 = new CollectorTestSink<>();
+    firstInstance.outputPort.setSink(sink1);
+    firstInstance.beginWindow(0);
+    firstInstance.pollRecords();
+    firstInstance.pollRecords();
+    firstInstance.emitTuples();
+    firstInstance.endWindow();
+
+    Assert.assertEquals("rows from db", 5, sink1.collectedTuples.size());
+    for (Object tuple : sink1.collectedTuples) {
+      TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple;
+      Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date);
+      Assert.assertTrue("date", pojoEvent.getId() < 5);
+    }
+
+    JdbcPOJOPollInputOperator secondInstance = 
(JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance();
+    CollectorTestSink<Object> sink2 = new CollectorTestSink<>();
+    secondInstance.outputPort.setSink(sink2);
+    secondInstance.beginWindow(0);
+    secondInstance.pollRecords();
+    secondInstance.emitTuples();
+    secondInstance.endWindow();
+
+    Assert.assertEquals("rows from db", 5, sink2.collectedTuples.size());
+    for (Object tuple : sink2.collectedTuples) {
+      TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple;
+      Assert.assertTrue("date", pojoEvent.getId() < 10 && pojoEvent.getId() >= 
5);
+    }
+
+    insertEvents(4, false, 10);
+    JdbcPOJOPollInputOperator thirdInstance = 
(JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance();
+    CollectorTestSink<Object> sink3 = new CollectorTestSink<>();
+    thirdInstance.outputPort.setSink(sink3);
+    thirdInstance.beginWindow(0);
+    thirdInstance.pollRecords();
+    thirdInstance.emitTuples();
+    thirdInstance.endWindow();
+
+    Assert.assertEquals("rows from db", 4, sink3.collectedTuples.size());
+  }
+
   private List<FieldInfo> getFieldInfos()
   {
     List<FieldInfo> fieldInfos = Lists.newArrayList();

Reply via email to