Repository: apex-malhar
Updated Branches:
  refs/heads/master 3316d6a78 -> 26fa9d781


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java 
b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
index 6c7e7d4..1ffe256 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
@@ -22,48 +22,28 @@ import java.sql.Connection;
 import java.sql.Date;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Time;
 import java.sql.Timestamp;
-import java.sql.Types;
-import java.util.List;
 
-import javax.annotation.Nonnull;
-
-import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.api.Attribute;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import com.datatorrent.lib.helper.TestPortContext;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.FieldInfo;
-import com.datatorrent.lib.util.TestUtils;
+
 import com.datatorrent.netlet.util.DTThrowable;
 
-/**
- * Tests for {@link AbstractJdbcTransactionableOutputOperator} and {@link 
AbstractJdbcInputOperator}
- */
 public class JdbcOperatorTest
 {
   public static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
   public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
 
-  private static final String TABLE_NAME = "test_event_table";
-  private static final String TABLE_POJO_NAME = "test_pojo_event_table";
-  private static final String TABLE_POJO_NAME_ID_DIFF = 
"test_pojo_event_table_id_diff";
-  private static final String TABLE_POJO_NAME_NAME_DIFF = 
"test_pojo_event_table_name_diff";
-  private static String APP_ID = "JdbcOperatorTest";
-  private static int OPERATOR_ID = 0;
+  protected static final String TABLE_NAME = "test_event_table";
+  protected static final String TABLE_POJO_NAME = "test_pojo_event_table";
+  protected static final String TABLE_POJO_NAME_ID_DIFF = 
"test_pojo_event_table_id_diff";
+  protected static final String TABLE_POJO_NAME_NAME_DIFF = 
"test_pojo_event_table_name_diff";
+  protected static String APP_ID = "JdbcOperatorTest";
+  protected static int OPERATOR_ID = 0;
 
-  private static class TestEvent
+  public static class TestEvent
   {
     int id;
 
@@ -151,6 +131,15 @@ public class JdbcOperatorTest
     {
       this.score = score;
     }
+
+    @Override
+    public String toString()
+    {
+      return "TestPOJOEvent [id=" + id + ", name=" + name + ", startDate=" + 
startDate + ", startTime=" + startTime
+          + ", startTimestamp=" + startTimestamp + ", score=" + score + "]";
+    }
+
+
   }
 
   @BeforeClass
@@ -162,25 +151,18 @@ public class JdbcOperatorTest
       Connection con = DriverManager.getConnection(URL);
       Statement stmt = con.createStatement();
 
-      String createMetaTable = "CREATE TABLE IF NOT EXISTS " + 
JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
-          + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT 
NULL, "
-          + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
-          + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
-          + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
-          + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + 
JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
-          + ")";
+      String createMetaTable = "CREATE TABLE IF NOT EXISTS " + 
JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " + 
JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + 
JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " + 
JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + "UNIQUE (" + 
JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + 
JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + 
JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + ")";
       stmt.executeUpdate(createMetaTable);
 
       String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID 
INTEGER)";
       stmt.executeUpdate(createTable);
       String createPOJOTable = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME
           + "(id INTEGER not NULL,name VARCHAR(255),startDate DATE,startTime 
TIME,startTimestamp TIMESTAMP, score DOUBLE, PRIMARY KEY ( id ))";
+
       stmt.executeUpdate(createPOJOTable);
-      String createPOJOTableIdDiff = "CREATE TABLE IF NOT EXISTS " + 
TABLE_POJO_NAME_ID_DIFF
-          + "(id1 INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id1 ))";
+      String createPOJOTableIdDiff = "CREATE TABLE IF NOT EXISTS " + 
TABLE_POJO_NAME_ID_DIFF + "(id1 INTEGER not NULL,name VARCHAR(255), PRIMARY KEY 
( id1 ))";
       stmt.executeUpdate(createPOJOTableIdDiff);
-      String createPOJOTableNameDiff = "CREATE TABLE IF NOT EXISTS " + 
TABLE_POJO_NAME_NAME_DIFF
-          + "(id INTEGER not NULL,name1 VARCHAR(255), PRIMARY KEY ( id ))";
+      String createPOJOTableNameDiff = "CREATE TABLE IF NOT EXISTS " + 
TABLE_POJO_NAME_NAME_DIFF + "(id INTEGER not NULL,name1 VARCHAR(255), PRIMARY 
KEY ( id ))";
       stmt.executeUpdate(createPOJOTableNameDiff);
     } catch (Throwable e) {
       DTThrowable.rethrow(e);
@@ -222,569 +204,7 @@ public class JdbcOperatorTest
     }
   }
 
-  private static class TestOutputOperator extends 
AbstractJdbcTransactionableOutputOperator<TestEvent>
-  {
-    private static final String INSERT_STMT = "INSERT INTO " + TABLE_NAME + " 
values (?)";
-
-    TestOutputOperator()
-    {
-      cleanTable();
-    }
-
-    @Nonnull
-    @Override
-    protected String getUpdateCommand()
-    {
-      return INSERT_STMT;
-    }
-
-    @Override
-    protected void setStatementParameters(PreparedStatement statement, 
TestEvent tuple) throws SQLException
-    {
-      statement.setInt(1, tuple.id);
-    }
-
-    public int getNumOfEventsInStore()
-    {
-      Connection con;
-      try {
-        con = DriverManager.getConnection(URL);
-        Statement stmt = con.createStatement();
-
-        String countQuery = "SELECT count(*) from " + TABLE_NAME;
-        ResultSet resultSet = stmt.executeQuery(countQuery);
-        resultSet.next();
-        return resultSet.getInt(1);
-      } catch (SQLException e) {
-        throw new RuntimeException("fetching count", e);
-      }
-    }
-  }
-
-  private static class TestPOJOOutputOperator extends 
JdbcPOJOInsertOutputOperator
-  {
-    TestPOJOOutputOperator()
-    {
-      cleanTable();
-    }
-
-    public int getNumOfEventsInStore(String tableName)
-    {
-      Connection con;
-      try {
-        con = DriverManager.getConnection(URL);
-        Statement stmt = con.createStatement();
-
-        String countQuery = "SELECT count(*) from " + tableName;
-        ResultSet resultSet = stmt.executeQuery(countQuery);
-        resultSet.next();
-        return resultSet.getInt(1);
-      } catch (SQLException e) {
-        throw new RuntimeException("fetching count", e);
-      }
-    }
-
-    public int getNumOfNullEventsInStore(String tableName)
-    {
-      Connection con;
-      try {
-        con = DriverManager.getConnection(URL);
-        Statement stmt = con.createStatement();
-
-        String countQuery = "SELECT count(*) from " + tableName + " where 
name1 is null";
-        ResultSet resultSet = stmt.executeQuery(countQuery);
-        resultSet.next();
-        return resultSet.getInt(1);
-      } catch (SQLException e) {
-        throw new RuntimeException("fetching count", e);
-      }
-    }
-
-  }
-
-  private static class TestPOJONonInsertOutputOperator extends 
JdbcPOJONonInsertOutputOperator
-  {
-    public TestPOJONonInsertOutputOperator()
-    {
-      cleanTable();
-    }
-
-    public int getNumOfEventsInStore()
-    {
-      Connection con;
-      try {
-        con = DriverManager.getConnection(URL);
-        Statement stmt = con.createStatement();
-
-        String countQuery = "SELECT count(*) from " + TABLE_POJO_NAME;
-        ResultSet resultSet = stmt.executeQuery(countQuery);
-        resultSet.next();
-        return resultSet.getInt(1);
-      } catch (SQLException e) {
-        throw new RuntimeException("fetching count", e);
-      }
-    }
-
-    public int getDistinctNonUnique()
-    {
-      Connection con;
-      try {
-        con = DriverManager.getConnection(URL);
-        Statement stmt = con.createStatement();
-
-        String countQuery = "SELECT count(distinct(name)) from " + 
TABLE_POJO_NAME;
-        ResultSet resultSet = stmt.executeQuery(countQuery);
-        resultSet.next();
-        return resultSet.getInt(1);
-      } catch (SQLException e) {
-        throw new RuntimeException("fetching count", e);
-      }
-    }
-  }
-
-  private static class TestInputOperator extends 
AbstractJdbcInputOperator<TestEvent>
-  {
-
-    private static final String retrieveQuery = "SELECT * FROM " + TABLE_NAME;
-
-    TestInputOperator()
-    {
-      cleanTable();
-    }
-
-    @Override
-    public TestEvent getTuple(ResultSet result)
-    {
-      try {
-        return new TestEvent(result.getInt(1));
-      } catch (SQLException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public String queryToRetrieveData()
-    {
-      return retrieveQuery;
-    }
-  }
-
-  @Test
-  public void testJdbcOutputOperator()
-  {
-    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
-    transactionalStore.setDatabaseDriver(DB_DRIVER);
-    transactionalStore.setDatabaseUrl(URL);
-
-    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap =
-        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
-    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
-    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
-        OPERATOR_ID, attributeMap);
-
-    TestOutputOperator outputOperator = new TestOutputOperator();
-    outputOperator.setBatchSize(3);
-    outputOperator.setStore(transactionalStore);
-
-    outputOperator.setup(context);
-
-    outputOperator.activate(context);
-    List<TestEvent> events = Lists.newArrayList();
-    for (int i = 0; i < 10; i++) {
-      events.add(new TestEvent(i));
-    }
-
-    outputOperator.beginWindow(0);
-    for (TestEvent event : events) {
-      outputOperator.input.process(event);
-    }
-    outputOperator.endWindow();
-
-    Assert.assertEquals("rows in db", 10, 
outputOperator.getNumOfEventsInStore());
-    cleanTable();
-  }
-
-  @Test
-  public void testJdbcPojoOutputOperator()
-  {
-    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
-    transactionalStore.setDatabaseDriver(DB_DRIVER);
-    transactionalStore.setDatabaseUrl(URL);
-
-    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap =
-        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
-    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
-    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
-        OPERATOR_ID, attributeMap);
-
-    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
-    outputOperator.setBatchSize(3);
-    outputOperator.setTablename(TABLE_POJO_NAME);
-
-    List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
-    fieldInfos.add(new JdbcFieldInfo("ID", "id", null, Types.INTEGER));
-    fieldInfos.add(new JdbcFieldInfo("NAME", "name", null, Types.VARCHAR));
-    outputOperator.setFieldInfos(fieldInfos);
-
-    outputOperator.setStore(transactionalStore);
-
-    outputOperator.setup(context);
-
-    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
-    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
-    TestPortContext tpc = new TestPortContext(portAttributes);
-    outputOperator.input.setup(tpc);
-
-    outputOperator.activate(context);
-
-    List<TestPOJOEvent> events = Lists.newArrayList();
-    for (int i = 0; i < 10; i++) {
-      events.add(new TestPOJOEvent(i, "test" + i));
-    }
-
-    outputOperator.beginWindow(0);
-    for (TestPOJOEvent event : events) {
-      outputOperator.input.process(event);
-    }
-    outputOperator.endWindow();
-
-    Assert.assertEquals("rows in db", 10, 
outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
-  }
-
-  /**
-   * This test will assume direct mapping for POJO fields to DB columns
-   * All fields in DB present in POJO
-   */
-  @Test
-  public void testJdbcPojoInsertOutputOperator()
-  {
-    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
-    transactionalStore.setDatabaseDriver(DB_DRIVER);
-    transactionalStore.setDatabaseUrl(URL);
-
-    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap =
-        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
-    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
-    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
-        OPERATOR_ID, attributeMap);
-
-    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
-    outputOperator.setBatchSize(3);
-    outputOperator.setTablename(TABLE_POJO_NAME);
-
-    outputOperator.setStore(transactionalStore);
-
-    outputOperator.setup(context);
-
-    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
-    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
-    TestPortContext tpc = new TestPortContext(portAttributes);
-    outputOperator.input.setup(tpc);
-
-    CollectorTestSink<Object> errorSink = new CollectorTestSink<>();
-    TestUtils.setSink(outputOperator.error, errorSink);
-
-    outputOperator.activate(context);
-
-    List<TestPOJOEvent> events = Lists.newArrayList();
-    for (int i = 0; i < 10; i++) {
-      events.add(new TestPOJOEvent(i, "test" + i));
-    }
-    events.add(new TestPOJOEvent(0, "test0")); // Records violating PK 
constraint
-    events.add(new TestPOJOEvent(2, "test2")); // Records violating PK 
constraint
-    events.add(new TestPOJOEvent(10, "test10")); // Clean record
-    events.add(new TestPOJOEvent(11, "test11")); // Clean record
-    events.add(new TestPOJOEvent(3, "test3")); // Records violating PK 
constraint
-    events.add(new TestPOJOEvent(12, "test12")); // Clean record
-
-    outputOperator.beginWindow(0);
-    for (TestPOJOEvent event : events) {
-      outputOperator.input.process(event);
-    }
-    outputOperator.endWindow();
-
-    Assert.assertEquals("rows in db", 13, 
outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
-    Assert.assertEquals("Error tuples", 3, errorSink.collectedTuples.size());
-  }
-
-  /**
-   * This test will assume direct mapping for POJO fields to DB columns
-   * Nullable DB field missing in POJO
-   * name1 field, which is nullable in DB is missing from POJO
-   * POJO(id, name) -> DB(id, name1)
-   */
-  @Test
-  public void testJdbcPojoInsertOutputOperatorNullName()
-  {
-    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
-    transactionalStore.setDatabaseDriver(DB_DRIVER);
-    transactionalStore.setDatabaseUrl(URL);
-
-    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap =
-        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
-    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
-    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
-        OPERATOR_ID, attributeMap);
-
-    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
-    outputOperator.setBatchSize(3);
-    outputOperator.setTablename(TABLE_POJO_NAME_NAME_DIFF);
-
-    outputOperator.setStore(transactionalStore);
-
-    outputOperator.setup(context);
-
-    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
-    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
-    TestPortContext tpc = new TestPortContext(portAttributes);
-    outputOperator.input.setup(tpc);
-
-    outputOperator.activate(context);
-
-    List<TestPOJOEvent> events = Lists.newArrayList();
-    for (int i = 0; i < 10; i++) {
-      events.add(new TestPOJOEvent(i, "test" + i));
-    }
-
-    outputOperator.beginWindow(0);
-    for (TestPOJOEvent event : events) {
-      outputOperator.input.process(event);
-    }
-    outputOperator.endWindow();
-
-    Assert.assertEquals("rows in db", 10, 
outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
-    Assert.assertEquals("null name rows in db", 10,
-        outputOperator.getNumOfNullEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
-  }
-
-  /**
-   * This test will assume direct mapping for POJO fields to DB columns.
-   * Non-Nullable DB field missing in POJO
-   * id1 field which is non-nullable in DB is missing from POJO
-   * POJO(id, name) -> DB(id1, name)
-   */
-  @Test
-  public void testJdbcPojoInsertOutputOperatorNullId()
-  {
-    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
-    transactionalStore.setDatabaseDriver(DB_DRIVER);
-    transactionalStore.setDatabaseUrl(URL);
-
-    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap =
-        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
-    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
-    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
-        OPERATOR_ID, attributeMap);
-
-    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
-    outputOperator.setBatchSize(3);
-    outputOperator.setTablename(TABLE_POJO_NAME_ID_DIFF);
-
-    outputOperator.setStore(transactionalStore);
-
-    outputOperator.setup(context);
-
-    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
-    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
-    TestPortContext tpc = new TestPortContext(portAttributes);
-    outputOperator.input.setup(tpc);
-
-    boolean exceptionOccurred = false;
-    try {
-      outputOperator.activate(context);
-    } catch (Exception e) {
-      exceptionOccurred = true;
-      Assert.assertTrue(e instanceof RuntimeException);
-      Assert.assertTrue(e.getMessage().toLowerCase().contains("id1 not found 
in pojo"));
-    }
-    Assert.assertTrue(exceptionOccurred);
-  }
-
-  @Test
-  public void testJdbcPojoOutputOperatorMerge()
-  {
-    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
-    transactionalStore.setDatabaseDriver(DB_DRIVER);
-    transactionalStore.setDatabaseUrl(URL);
-
-    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap =
-        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
-    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
-    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
-        OPERATOR_ID, attributeMap);
-
-    TestPOJONonInsertOutputOperator updateOperator = new 
TestPOJONonInsertOutputOperator();
-    updateOperator.setBatchSize(3);
-
-    updateOperator.setStore(transactionalStore);
-
-    updateOperator.setSqlStatement("MERGE INTO " + TABLE_POJO_NAME + " AS T 
USING (VALUES (?, ?)) AS FOO(id, name) "
-        + "ON T.id = FOO.id "
-        + "WHEN MATCHED THEN UPDATE SET name = FOO.name "
-        + "WHEN NOT MATCHED THEN INSERT( id, name ) VALUES (FOO.id, 
FOO.name);");
-
-    List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
-    fieldInfos.add(new JdbcFieldInfo("id", "id", null, Types.INTEGER));
-    fieldInfos.add(new JdbcFieldInfo("name", "name", null, Types.VARCHAR));
-    updateOperator.setFieldInfos(fieldInfos);
-    updateOperator.setup(context);
-
-    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
-    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
-    TestPortContext tpc = new TestPortContext(portAttributes);
-    updateOperator.input.setup(tpc);
-
-    updateOperator.activate(context);
-
-    List<TestPOJOEvent> events = Lists.newArrayList();
-    for (int i = 0; i < 10; i++) {
-      events.add(new TestPOJOEvent(i, "test" + i));
-    }
-    for (int i = 0; i < 5; i++) {
-      events.add(new TestPOJOEvent(i, "test" + 100));
-    }
-
-    updateOperator.getDistinctNonUnique();
-    updateOperator.beginWindow(0);
-    for (TestPOJOEvent event : events) {
-      updateOperator.input.process(event);
-    }
-    updateOperator.endWindow();
-
-    // Expect 10 unique ids: 0 - 9
-    Assert.assertEquals("rows in db", 10, 
updateOperator.getNumOfEventsInStore());
-    // Expect 6 unique name: test-100, test-5, test-6, test-7, test-8, test-9
-    Assert.assertEquals("rows in db", 6, 
updateOperator.getDistinctNonUnique());
-  }
-
-  @Test
-  public void testJdbcInputOperator()
-  {
-    JdbcStore store = new JdbcStore();
-    store.setDatabaseDriver(DB_DRIVER);
-    store.setDatabaseUrl(URL);
-
-    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap =
-        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
-    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
-    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
-        OPERATOR_ID, attributeMap);
-
-    TestInputOperator inputOperator = new TestInputOperator();
-    inputOperator.setStore(store);
-    insertEventsInTable(10);
-
-    CollectorTestSink<Object> sink = new CollectorTestSink<>();
-    inputOperator.outputPort.setSink(sink);
-
-    inputOperator.setup(context);
-    inputOperator.beginWindow(0);
-    inputOperator.emitTuples();
-    inputOperator.endWindow();
-
-    Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
-  }
-
-  @Test
-  public void testJdbcPojoInputOperator()
-  {
-    JdbcStore store = new JdbcStore();
-    store.setDatabaseDriver(DB_DRIVER);
-    store.setDatabaseUrl(URL);
-
-    Attribute.AttributeMap.DefaultAttributeMap attributeMap = new 
Attribute.AttributeMap.DefaultAttributeMap();
-    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
-    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
-        OPERATOR_ID, attributeMap);
-    
-    insertEvents(10,true, 0);
-
-    JdbcPOJOInputOperator inputOperator = new JdbcPOJOInputOperator();
-    inputOperator.setStore(store);
-    inputOperator.setTableName(TABLE_POJO_NAME);
-
-    List<FieldInfo> fieldInfos = Lists.newArrayList();
-    fieldInfos.add(new FieldInfo("ID", "id", null));
-    fieldInfos.add(new FieldInfo("STARTDATE", "startDate", null));
-    fieldInfos.add(new FieldInfo("STARTTIME", "startTime", null));
-    fieldInfos.add(new FieldInfo("STARTTIMESTAMP", "startTimestamp", null));
-    fieldInfos.add(new FieldInfo("SCORE", "score", 
FieldInfo.SupportType.DOUBLE));
-    inputOperator.setFieldInfos(fieldInfos);
-
-    inputOperator.setFetchSize(5);
-
-    CollectorTestSink<Object> sink = new CollectorTestSink<>();
-    inputOperator.outputPort.setSink(sink);
-
-    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
-    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
-    TestPortContext tpc = new TestPortContext(portAttributes);
-
-    inputOperator.setup(context);
-    inputOperator.outputPort.setup(tpc);
-
-    inputOperator.activate(context);
-
-    inputOperator.beginWindow(0);
-    inputOperator.emitTuples();
-    inputOperator.endWindow();
-
-    Assert.assertEquals("rows from db", 5, sink.collectedTuples.size());
-    int i = 0;
-    for (Object tuple : sink.collectedTuples) {
-      TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple;
-      Assert.assertTrue("i=" + i, pojoEvent.getId() == i);
-      Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date);
-      Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time);
-      Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof 
Timestamp);
-      i++;
-    }
-    sink.collectedTuples.clear();
-
-    inputOperator.beginWindow(1);
-    inputOperator.emitTuples();
-    inputOperator.endWindow();
-
-    Assert.assertEquals("rows from db", 5, sink.collectedTuples.size());
-    for (Object tuple : sink.collectedTuples) {
-      TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple;
-      Assert.assertTrue("i=" + i, pojoEvent.getId() == i);
-      Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date);
-      Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time);
-      Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof 
Timestamp);
-      Assert.assertTrue("score", pojoEvent.getScore() == 55.4);
-      i++;
-    }
-
-    sink.collectedTuples.clear();
-
-    inputOperator.beginWindow(2);
-    inputOperator.emitTuples();
-    inputOperator.endWindow();
-
-    Assert.assertEquals("rows from db", 0, sink.collectedTuples.size());
-    
-    // Insert 3 more tuples and check if they are read successfully.
-    insertEvents(3, false, 10);
-
-    inputOperator.beginWindow(3);
-    inputOperator.emitTuples();
-    inputOperator.endWindow();
-
-    Assert.assertEquals("rows from db", 3, sink.collectedTuples.size());
-    for (Object tuple : sink.collectedTuples) {
-      TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple;
-      Assert.assertTrue("i=" + i, pojoEvent.getId() == i);
-      Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date);
-      Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time);
-      Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof 
Timestamp);
-      Assert.assertTrue("score", pojoEvent.getScore() == 55.4);
-      i++;
-    }
-  }
-  
-
-  private void insertEvents(int numEvents, boolean cleanExistingRows, int 
startRowId)
+  protected static void insertEvents(int numEvents, boolean cleanExistingRows, 
int startRowId)
   {
     try (Connection con = DriverManager.getConnection(URL); Statement stmt = 
con.createStatement()) {
       if (cleanExistingRows) {
@@ -798,13 +218,14 @@ public class JdbcOperatorTest
 
       for (int i = 0; i < numEvents; i++) {
         pStmt.setInt(1, startRowId + i);
-        pStmt.setString(2, "name");
+        pStmt.setString(2, "name" + i);
         pStmt.setDate(3, new Date(2016, 1, 1));
         pStmt.setTime(4, new Time(2016, 1, 1));
         pStmt.setTimestamp(5, new Timestamp(2016, 1, 1, 0, 0, 0, 0));
         pStmt.setDouble(6, new Double(55.4));
         pStmt.executeUpdate();
       }
+     
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java 
b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java
new file mode 100644
index 0000000..e6d8b42
--- /dev/null
+++ 
b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java
@@ -0,0 +1,606 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.List;
+
+import javax.annotation.Nonnull;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.helper.TestPortContext;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.TestUtils;
+
+/**
+ * Tests for {@link AbstractJdbcTransactionableOutputOperator} and
+ * {@link AbstractJdbcInputOperator}
+ */
+public class JdbcPojoOperatorTest extends JdbcOperatorTest
+{
+
+  private static class TestOutputOperator extends 
AbstractJdbcTransactionableOutputOperator<TestEvent>
+  {
+    private static final String INSERT_STMT = "INSERT INTO " + TABLE_NAME + " 
values (?)";
+
+    TestOutputOperator()
+    {
+      cleanTable();
+    }
+
+    @Nonnull
+    @Override
+    protected String getUpdateCommand()
+    {
+      return INSERT_STMT;
+    }
+
+    @Override
+    protected void setStatementParameters(PreparedStatement statement, 
TestEvent tuple) throws SQLException
+    {
+      statement.setInt(1, tuple.id);
+    }
+
+    public int getNumOfEventsInStore()
+    {
+      Connection con;
+      try {
+        con = DriverManager.getConnection(URL);
+        Statement stmt = con.createStatement();
+
+        String countQuery = "SELECT count(*) from " + TABLE_NAME;
+        ResultSet resultSet = stmt.executeQuery(countQuery);
+        resultSet.next();
+        return resultSet.getInt(1);
+      } catch (SQLException e) {
+        throw new RuntimeException("fetching count", e);
+      }
+    }
+  }
+
+  private static class TestInputOperator extends 
AbstractJdbcInputOperator<TestEvent>
+  {
+
+    private static final String retrieveQuery = "SELECT * FROM " + TABLE_NAME;
+
+    TestInputOperator()
+    {
+      cleanTable();
+    }
+
+    @Override
+    public TestEvent getTuple(ResultSet result)
+    {
+      try {
+        return new TestEvent(result.getInt(1));
+      } catch (SQLException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public String queryToRetrieveData()
+    {
+      return retrieveQuery;
+    }
+  }
+
+  private static class TestPOJOOutputOperator extends 
JdbcPOJOInsertOutputOperator
+  {
+    TestPOJOOutputOperator()
+    {
+      cleanTable();
+    }
+
+    public int getNumOfEventsInStore(String tableName)
+    {
+      Connection con;
+      try {
+        con = DriverManager.getConnection(URL);
+        Statement stmt = con.createStatement();
+
+        String countQuery = "SELECT count(*) from " + tableName;
+        ResultSet resultSet = stmt.executeQuery(countQuery);
+        resultSet.next();
+        return resultSet.getInt(1);
+      } catch (SQLException e) {
+        throw new RuntimeException("fetching count", e);
+      }
+    }
+
+    public int getNumOfNullEventsInStore(String tableName)
+    {
+      Connection con;
+      try {
+        con = DriverManager.getConnection(URL);
+        Statement stmt = con.createStatement();
+
+        String countQuery = "SELECT count(*) from " + tableName + " where 
name1 is null";
+        ResultSet resultSet = stmt.executeQuery(countQuery);
+        resultSet.next();
+        return resultSet.getInt(1);
+      } catch (SQLException e) {
+        throw new RuntimeException("fetching count", e);
+      }
+    }
+
+    private static class TestPOJONonInsertOutputOperator extends 
JdbcPOJONonInsertOutputOperator
+    {
+      public TestPOJONonInsertOutputOperator()
+      {
+        cleanTable();
+      }
+
+      public int getNumOfEventsInStore()
+      {
+        Connection con;
+        try {
+          con = DriverManager.getConnection(URL);
+          Statement stmt = con.createStatement();
+
+          String countQuery = "SELECT count(*) from " + TABLE_POJO_NAME;
+          ResultSet resultSet = stmt.executeQuery(countQuery);
+          resultSet.next();
+          return resultSet.getInt(1);
+        } catch (SQLException e) {
+          throw new RuntimeException("fetching count", e);
+        }
+      }
+
+      public int getDistinctNonUnique()
+      {
+        Connection con;
+        try {
+          con = DriverManager.getConnection(URL);
+          Statement stmt = con.createStatement();
+
+          String countQuery = "SELECT count(distinct(name)) from " + 
TABLE_POJO_NAME;
+          ResultSet resultSet = stmt.executeQuery(countQuery);
+          resultSet.next();
+          return resultSet.getInt(1);
+        } catch (SQLException e) {
+          throw new RuntimeException("fetching count", e);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testJdbcOutputOperator()
+  {
+    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+    transactionalStore.setDatabaseDriver(DB_DRIVER);
+    transactionalStore.setDatabaseUrl(URL);
+
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap = new 
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    TestOutputOperator outputOperator = new TestOutputOperator();
+    outputOperator.setBatchSize(3);
+    outputOperator.setStore(transactionalStore);
+
+    outputOperator.setup(context);
+
+    outputOperator.activate(context);
+    List<TestEvent> events = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      events.add(new TestEvent(i));
+    }
+
+    outputOperator.beginWindow(0);
+    for (TestEvent event : events) {
+      outputOperator.input.process(event);
+    }
+    outputOperator.endWindow();
+
+    Assert.assertEquals("rows in db", 10, 
outputOperator.getNumOfEventsInStore());
+    cleanTable();
+  }
+
+  @Test
+  public void testJdbcPojoOutputOperator()
+  {
+    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+    transactionalStore.setDatabaseDriver(DB_DRIVER);
+    transactionalStore.setDatabaseUrl(URL);
+
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap = new 
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
+    outputOperator.setBatchSize(3);
+    outputOperator.setTablename(TABLE_POJO_NAME);
+
+    List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new JdbcFieldInfo("ID", "id", null, Types.INTEGER));
+    fieldInfos.add(new JdbcFieldInfo("NAME", "name", null, Types.VARCHAR));
+    outputOperator.setFieldInfos(fieldInfos);
+
+    outputOperator.setStore(transactionalStore);
+
+    outputOperator.setup(context);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+    outputOperator.input.setup(tpc);
+
+    outputOperator.activate(context);
+
+    List<TestPOJOEvent> events = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      events.add(new TestPOJOEvent(i, "test" + i));
+    }
+
+    outputOperator.beginWindow(0);
+    for (TestPOJOEvent event : events) {
+      outputOperator.input.process(event);
+    }
+    outputOperator.endWindow();
+
+    Assert.assertEquals("rows in db", 10, 
outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
+  }
+
+  /**
+   * This test will assume direct mapping for POJO fields to DB columns All
+   * fields in DB present in POJO
+   */
+  @Test
+  public void testJdbcPojoInsertOutputOperator()
+  {
+    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+    transactionalStore.setDatabaseDriver(DB_DRIVER);
+    transactionalStore.setDatabaseUrl(URL);
+
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap = new 
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
+    outputOperator.setBatchSize(3);
+    outputOperator.setTablename(TABLE_POJO_NAME);
+
+    outputOperator.setStore(transactionalStore);
+
+    outputOperator.setup(context);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+    outputOperator.input.setup(tpc);
+
+    CollectorTestSink<Object> errorSink = new CollectorTestSink<>();
+    TestUtils.setSink(outputOperator.error, errorSink);
+
+    outputOperator.activate(context);
+
+    List<TestPOJOEvent> events = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      events.add(new TestPOJOEvent(i, "test" + i));
+    }
+    events.add(new TestPOJOEvent(0, "test0")); // Records violating PK 
constraint
+    events.add(new TestPOJOEvent(2, "test2")); // Records violating PK 
constraint
+    events.add(new TestPOJOEvent(10, "test10")); // Clean record
+    events.add(new TestPOJOEvent(11, "test11")); // Clean record
+    events.add(new TestPOJOEvent(3, "test3")); // Records violating PK 
constraint
+    events.add(new TestPOJOEvent(12, "test12")); // Clean record
+
+    outputOperator.beginWindow(0);
+    for (TestPOJOEvent event : events) {
+      outputOperator.input.process(event);
+    }
+    outputOperator.endWindow();
+
+    Assert.assertEquals("rows in db", 13, 
outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
+    Assert.assertEquals("Error tuples", 3, errorSink.collectedTuples.size());
+  }
+
+  /**
+   * This test will assume direct mapping for POJO fields to DB columns 
Nullable
+   * DB field missing in POJO name1 field, which is nullable in DB is missing
+   * from POJO POJO(id, name) -> DB(id, name1)
+   */
+  @Test
+  public void testJdbcPojoInsertOutputOperatorNullName()
+  {
+    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+    transactionalStore.setDatabaseDriver(DB_DRIVER);
+    transactionalStore.setDatabaseUrl(URL);
+
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap = new 
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
+    outputOperator.setBatchSize(3);
+    outputOperator.setTablename(TABLE_POJO_NAME_NAME_DIFF);
+
+    outputOperator.setStore(transactionalStore);
+
+    outputOperator.setup(context);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+    outputOperator.input.setup(tpc);
+
+    outputOperator.activate(context);
+
+    List<TestPOJOEvent> events = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      events.add(new TestPOJOEvent(i, "test" + i));
+    }
+
+    outputOperator.beginWindow(0);
+    for (TestPOJOEvent event : events) {
+      outputOperator.input.process(event);
+    }
+    outputOperator.endWindow();
+
+    Assert.assertEquals("rows in db", 10, 
outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
+    Assert
+        .assertEquals("null name rows in db", 10, 
outputOperator.getNumOfNullEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
+  }
+
+  /**
+   * This test will assume direct mapping for POJO fields to DB columns.
+   * Non-Nullable DB field missing in POJO id1 field which is non-nullable in 
DB
+   * is missing from POJO POJO(id, name) -> DB(id1, name)
+   */
+  @Test
+  public void testJdbcPojoInsertOutputOperatorNullId()
+  {
+    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+    transactionalStore.setDatabaseDriver(DB_DRIVER);
+    transactionalStore.setDatabaseUrl(URL);
+
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap = new 
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
+    outputOperator.setBatchSize(3);
+    outputOperator.setTablename(TABLE_POJO_NAME_ID_DIFF);
+
+    outputOperator.setStore(transactionalStore);
+
+    outputOperator.setup(context);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+    outputOperator.input.setup(tpc);
+
+    boolean exceptionOccurred = false;
+    try {
+      outputOperator.activate(context);
+    } catch (Exception e) {
+      exceptionOccurred = true;
+      Assert.assertTrue(e instanceof RuntimeException);
+      Assert.assertTrue(e.getMessage().toLowerCase().contains("id1 not found 
in pojo"));
+    }
+    Assert.assertTrue(exceptionOccurred);
+  }
+
+  @Test
+  public void testJdbcPojoOutputOperatorMerge()
+  {
+    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+    transactionalStore.setDatabaseDriver(DB_DRIVER);
+    transactionalStore.setDatabaseUrl(URL);
+
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap = new 
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    TestPOJOOutputOperator.TestPOJONonInsertOutputOperator updateOperator = 
new TestPOJOOutputOperator.TestPOJONonInsertOutputOperator();
+    updateOperator.setBatchSize(3);
+
+    updateOperator.setStore(transactionalStore);
+
+    updateOperator.setSqlStatement("MERGE INTO " + TABLE_POJO_NAME + " AS T 
USING (VALUES (?, ?)) AS FOO(id, name) "
+        + "ON T.id = FOO.id " + "WHEN MATCHED THEN UPDATE SET name = FOO.name "
+        + "WHEN NOT MATCHED THEN INSERT( id, name ) VALUES (FOO.id, 
FOO.name);");
+
+    List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new JdbcFieldInfo("id", "id", null, Types.INTEGER));
+    fieldInfos.add(new JdbcFieldInfo("name", "name", null, Types.VARCHAR));
+    updateOperator.setFieldInfos(fieldInfos);
+    updateOperator.setup(context);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+    updateOperator.input.setup(tpc);
+
+    updateOperator.activate(context);
+
+    List<TestPOJOEvent> events = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      events.add(new TestPOJOEvent(i, "test" + i));
+    }
+    for (int i = 0; i < 5; i++) {
+      events.add(new TestPOJOEvent(i, "test" + 100));
+    }
+
+    updateOperator.getDistinctNonUnique();
+    updateOperator.beginWindow(0);
+    for (TestPOJOEvent event : events) {
+      updateOperator.input.process(event);
+    }
+    updateOperator.endWindow();
+
+    // Expect 10 unique ids: 0 - 9
+    Assert.assertEquals("rows in db", 10, 
updateOperator.getNumOfEventsInStore());
+    // Expect 6 unique name: test-100, test-5, test-6, test-7, test-8, test-9
+    Assert.assertEquals("rows in db", 6, 
updateOperator.getDistinctNonUnique());
+  }
+
+  @Test
+  public void testJdbcInputOperator()
+  {
+    JdbcStore store = new JdbcStore();
+    store.setDatabaseDriver(DB_DRIVER);
+    store.setDatabaseUrl(URL);
+
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap = new 
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    TestInputOperator inputOperator = new TestInputOperator();
+    inputOperator.setStore(store);
+    insertEventsInTable(10);
+
+    CollectorTestSink<Object> sink = new CollectorTestSink<>();
+    inputOperator.outputPort.setSink(sink);
+
+    inputOperator.setup(context);
+    inputOperator.beginWindow(0);
+    inputOperator.emitTuples();
+    inputOperator.endWindow();
+
+    Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
+  }
+
+  @Test
+  public void testJdbcPojoInputOperator()
+  {
+    JdbcStore store = new JdbcStore();
+    store.setDatabaseDriver(DB_DRIVER);
+    store.setDatabaseUrl(URL);
+
+    Attribute.AttributeMap.DefaultAttributeMap attributeMap = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    insertEvents(10, true, 0);
+
+    JdbcPOJOInputOperator inputOperator = new JdbcPOJOInputOperator();
+    inputOperator.setStore(store);
+    inputOperator.setTableName(TABLE_POJO_NAME);
+
+    List<FieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new FieldInfo("ID", "id", null));
+    fieldInfos.add(new FieldInfo("STARTDATE", "startDate", null));
+    fieldInfos.add(new FieldInfo("STARTTIME", "startTime", null));
+    fieldInfos.add(new FieldInfo("STARTTIMESTAMP", "startTimestamp", null));
+    fieldInfos.add(new FieldInfo("SCORE", "score", 
FieldInfo.SupportType.DOUBLE));
+    inputOperator.setFieldInfos(fieldInfos);
+
+    inputOperator.setFetchSize(5);
+
+    CollectorTestSink<Object> sink = new CollectorTestSink<>();
+    inputOperator.outputPort.setSink(sink);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+
+    inputOperator.setup(context);
+    inputOperator.outputPort.setup(tpc);
+
+    inputOperator.activate(context);
+
+    inputOperator.beginWindow(0);
+    inputOperator.emitTuples();
+    inputOperator.endWindow();
+
+    Assert.assertEquals("rows from db", 5, sink.collectedTuples.size());
+    int i = 0;
+    for (Object tuple : sink.collectedTuples) {
+      TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple;
+      Assert.assertTrue("i=" + i, pojoEvent.getId() == i);
+      Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date);
+      Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time);
+      Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof 
Timestamp);
+      i++;
+    }
+    sink.collectedTuples.clear();
+
+    inputOperator.beginWindow(1);
+    inputOperator.emitTuples();
+    inputOperator.endWindow();
+
+    Assert.assertEquals("rows from db", 5, sink.collectedTuples.size());
+    for (Object tuple : sink.collectedTuples) {
+      TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple;
+      Assert.assertTrue("i=" + i, pojoEvent.getId() == i);
+      Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date);
+      Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time);
+      Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof 
Timestamp);
+      Assert.assertTrue("score", pojoEvent.getScore() == 55.4);
+      i++;
+    }
+
+    sink.collectedTuples.clear();
+
+    inputOperator.beginWindow(2);
+    inputOperator.emitTuples();
+    inputOperator.endWindow();
+
+    Assert.assertEquals("rows from db", 0, sink.collectedTuples.size());
+
+    // Insert 3 more tuples and check if they are read successfully.
+    insertEvents(3, false, 10);
+
+    inputOperator.beginWindow(3);
+    inputOperator.emitTuples();
+    inputOperator.endWindow();
+
+    Assert.assertEquals("rows from db", 3, sink.collectedTuples.size());
+    for (Object tuple : sink.collectedTuples) {
+      TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple;
+      Assert.assertTrue("i=" + i, pojoEvent.getId() == i);
+      Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date);
+      Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time);
+      Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof 
Timestamp);
+      Assert.assertTrue("score", pojoEvent.getScore() == 55.4);
+      i++;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
 
b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
new file mode 100644
index 0000000..2f3f356
--- /dev/null
+++ 
b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.helper.TestPortContext;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest
+{
+  public String dir = null;
+  @Mock
+  private ScheduledExecutorService mockscheduler;
+  @Mock
+  private ScheduledFuture futureTaskMock;
+  @Mock
+  private WindowDataManager windowDataManagerMock;
+
+  @Before
+  public void beforeTest()
+  {
+    dir = "target/" + APP_ID + "/";
+
+    MockitoAnnotations.initMocks(this);
+    when(mockscheduler.scheduleWithFixedDelay(any(Runnable.class), anyLong(), 
anyLong(), any(TimeUnit.class)))
+        .thenReturn(futureTaskMock);
+  }
+
+  @After
+  public void afterTest() throws IOException
+  {
+    cleanTable();
+    FileUtils.deleteDirectory(new File(dir));
+  }
+
+  @Test
+  public void testDBPoller() throws InterruptedException
+  {
+    insertEvents(10, true, 0);
+
+    JdbcStore store = new JdbcStore();
+    store.setDatabaseDriver(DB_DRIVER);
+    store.setDatabaseUrl(URL);
+
+    List<FieldInfo> fieldInfos = getFieldInfos();
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+
+    JdbcPOJOPollInputOperator inputOperator = new JdbcPOJOPollInputOperator();
+    inputOperator.setStore(store);
+    inputOperator.setTableName(TABLE_POJO_NAME);
+    inputOperator.setKey("id");
+    inputOperator.setFieldInfos(fieldInfos);
+    inputOperator.setFetchSize(100);
+    inputOperator.setBatchSize(100);
+    inputOperator.setPartitionCount(2);
+
+    
Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>
 newPartitions = inputOperator
+        .definePartitions(new 
ArrayList<Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>(), 
null);
+
+    int operatorId = 0;
+    for 
(com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>
 partition : newPartitions) {
+
+      Attribute.AttributeMap.DefaultAttributeMap partitionAttributeMap = new 
Attribute.AttributeMap.DefaultAttributeMap();
+      partitionAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
+      partitionAttributeMap.put(Context.DAGContext.APPLICATION_PATH, dir);
+
+      OperatorContextTestHelper.TestIdOperatorContext partitioningContext = 
new OperatorContextTestHelper.TestIdOperatorContext(
+          operatorId++, partitionAttributeMap);
+
+      JdbcPOJOPollInputOperator parition = 
(JdbcPOJOPollInputOperator)partition.getPartitionedInstance();
+      parition.outputPort.setup(tpc);
+      parition.setScheduledExecutorService(mockscheduler);
+      parition.setup(partitioningContext);
+      parition.activate(partitioningContext);
+    }
+
+    
Iterator<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>
 itr = newPartitions
+        .iterator();
+    // First partition is for range queries,last is for polling queries
+    JdbcPOJOPollInputOperator firstInstance = 
(JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance();
+    CollectorTestSink<Object> sink1 = new CollectorTestSink<>();
+    firstInstance.outputPort.setSink(sink1);
+    firstInstance.beginWindow(0);
+    firstInstance.pollRecords();
+    firstInstance.pollRecords();
+    firstInstance.emitTuples();
+    firstInstance.endWindow();
+
+    Assert.assertEquals("rows from db", 5, sink1.collectedTuples.size());
+    for (Object tuple : sink1.collectedTuples) {
+      TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple;
+      Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date);
+      Assert.assertTrue("date", pojoEvent.getId() < 5);
+    }
+
+    JdbcPOJOPollInputOperator secondInstance = 
(JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance();
+    CollectorTestSink<Object> sink2 = new CollectorTestSink<>();
+    secondInstance.outputPort.setSink(sink2);
+    secondInstance.beginWindow(0);
+    secondInstance.pollRecords();
+    secondInstance.emitTuples();
+    secondInstance.endWindow();
+
+    Assert.assertEquals("rows from db", 5, sink2.collectedTuples.size());
+    for (Object tuple : sink2.collectedTuples) {
+      TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple;
+      Assert.assertTrue("date", pojoEvent.getId() < 10 && pojoEvent.getId() >= 
5);
+    }
+
+    insertEvents(4, false, 10);
+    JdbcPOJOPollInputOperator thirdInstance = 
(JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance();
+    CollectorTestSink<Object> sink3 = new CollectorTestSink<>();
+    thirdInstance.outputPort.setSink(sink3);
+    thirdInstance.beginWindow(0);
+    thirdInstance.pollRecords();
+    thirdInstance.emitTuples();
+    thirdInstance.endWindow();
+
+    Assert.assertEquals("rows from db", 4, sink3.collectedTuples.size());
+  }
+
+  @Test
+  public void testRecovery() throws IOException
+  {
+    int operatorId = 1;
+    when(windowDataManagerMock.getLargestRecoveryWindow()).thenReturn(1L);
+    when(windowDataManagerMock.load(operatorId, 1)).thenReturn(new 
MutablePair<Integer, Integer>(0, 4));
+
+    insertEvents(10, true, 0);
+
+    JdbcStore store = new JdbcStore();
+    store.setDatabaseDriver(DB_DRIVER);
+    store.setDatabaseUrl(URL);
+
+    List<FieldInfo> fieldInfos = getFieldInfos();
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+
+    Attribute.AttributeMap.DefaultAttributeMap partitionAttributeMap = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    partitionAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    partitionAttributeMap.put(Context.DAGContext.APPLICATION_PATH, dir);
+
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
+        operatorId, partitionAttributeMap);
+
+    JdbcPOJOPollInputOperator inputOperator = new JdbcPOJOPollInputOperator();
+    inputOperator.setStore(store);
+    inputOperator.setTableName(TABLE_POJO_NAME);
+    inputOperator.setKey("id");
+    inputOperator.setFieldInfos(fieldInfos);
+    inputOperator.setFetchSize(100);
+    inputOperator.setBatchSize(100);
+    inputOperator.lastEmittedRow = 0; //setting as not calling partition logic
+    inputOperator.rangeQueryPair = new KeyValPair<Integer, Integer>(0, 8);
+
+    inputOperator.outputPort.setup(tpc);
+    inputOperator.setScheduledExecutorService(mockscheduler);
+    inputOperator.setup(context);
+    inputOperator.setWindowManager(windowDataManagerMock);
+    inputOperator.activate(context);
+
+    CollectorTestSink<Object> sink = new CollectorTestSink<>();
+    inputOperator.outputPort.setSink(sink);
+    inputOperator.beginWindow(0);
+    verify(mockscheduler, times(0)).scheduleAtFixedRate(any(Runnable.class), 
anyLong(), anyLong(), any(TimeUnit.class));
+    inputOperator.emitTuples();
+    inputOperator.endWindow();
+    inputOperator.beginWindow(1);
+    verify(mockscheduler, times(1)).scheduleAtFixedRate(any(Runnable.class), 
anyLong(), anyLong(), any(TimeUnit.class));
+
+  }
+
+  private List<FieldInfo> getFieldInfos()
+  {
+    List<FieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new FieldInfo("ID", "id", null));
+    fieldInfos.add(new FieldInfo("STARTDATE", "startDate", null));
+    fieldInfos.add(new FieldInfo("STARTTIME", "startTime", null));
+    fieldInfos.add(new FieldInfo("STARTTIMESTAMP", "startTimestamp", null));
+    fieldInfos.add(new FieldInfo("NAME", "name", null));
+    return fieldInfos;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java 
b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java
deleted file mode 100644
index 573e45d..0000000
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.db.jdbc;
-
-import java.io.File;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.commons.io.FileUtils;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.api.Attribute;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultPartition;
-import com.datatorrent.api.Partitioner;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-
-/**
- * Tests for {@link AbstractJdbcPollInputOperator} and
- * {@link JdbcPollInputOperator}
- */
-public class JdbcPollerTest
-{
-  public static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
-  public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
-
-  private static final String TABLE_NAME = "test_account_table";
-  private static String APP_ID = "JdbcPollingOperatorTest";
-  public String dir = null;
-
-  @BeforeClass
-  public static void setup()
-  {
-    try {
-      cleanup();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    try {
-      Class.forName(DB_DRIVER).newInstance();
-
-      Connection con = DriverManager.getConnection(URL);
-      Statement stmt = con.createStatement();
-
-      String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
-          + " (Account_No INTEGER, Name VARCHAR(255), Amount INTEGER)";
-      stmt.executeUpdate(createTable);
-
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @AfterClass
-  public static void cleanup()
-  {
-    try {
-      FileUtils.deleteDirectory(new File("target/" + APP_ID));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static void cleanTable()
-  {
-    try {
-      Connection con = DriverManager.getConnection(URL);
-      Statement stmt = con.createStatement();
-      String cleanTable = "delete from " + TABLE_NAME;
-      stmt.executeUpdate(cleanTable);
-    } catch (SQLException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static void insertEventsInTable(int numEvents, int offset)
-  {
-    try {
-      Connection con = DriverManager.getConnection(URL);
-      String insert = "insert into " + TABLE_NAME + " values (?,?,?)";
-      PreparedStatement stmt = con.prepareStatement(insert);
-      for (int i = 0; i < numEvents; i++, offset++) {
-        stmt.setInt(1, offset);
-        stmt.setString(2, "Account_Holder-" + offset);
-        stmt.setInt(3, (offset * 1000));
-        stmt.executeUpdate();
-      }
-    } catch (SQLException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * Simulates actual application flow Adds a batch query partitiom, a pollable
-   * partition Incremental record polling is also checked
-   */
-  @Test
-  public void testJdbcPollingInputOperatorBatch() throws InterruptedException
-  {
-    cleanTable();
-    insertEventsInTable(10, 0);
-    JdbcStore store = new JdbcStore();
-    store.setDatabaseDriver(DB_DRIVER);
-    store.setDatabaseUrl(URL);
-
-    Attribute.AttributeMap.DefaultAttributeMap attributeMap = new 
Attribute.AttributeMap.DefaultAttributeMap();
-    this.dir = "target/" + APP_ID + "/";
-    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
-    attributeMap.put(Context.DAGContext.APPLICATION_PATH, dir);
-
-    JdbcPollInputOperator inputOperator = new JdbcPollInputOperator();
-    inputOperator.setStore(store);
-    inputOperator.setBatchSize(100);
-    inputOperator.setPollInterval(1000);
-    inputOperator.setEmitColumnList("Account_No,Name,Amount");
-    inputOperator.setKey("Account_No");
-    inputOperator.setTableName(TABLE_NAME);
-    inputOperator.setFetchSize(100);
-    inputOperator.setPartitionCount(1);
-
-    CollectorTestSink<Object> sink = new CollectorTestSink<>();
-    inputOperator.outputPort.setSink(sink);
-
-    TestUtils.MockBatchedOperatorStats readerStats = new 
TestUtils.MockBatchedOperatorStats(2);
-
-    DefaultPartition<AbstractJdbcPollInputOperator<Object>> apartition = new 
DefaultPartition<AbstractJdbcPollInputOperator<Object>>(
-        inputOperator);
-
-    TestUtils.MockPartition<AbstractJdbcPollInputOperator<Object>> 
pseudoParttion = new TestUtils.MockPartition<>(
-        apartition, readerStats);
-
-    List<Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> 
newMocks = Lists.newArrayList();
-
-    newMocks.add(pseudoParttion);
-
-    
Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>
 newPartitions = inputOperator
-        .definePartitions(newMocks, null);
-
-    
Iterator<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>
 itr = newPartitions
-        .iterator();
-
-    int operatorId = 0;
-    for 
(com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>
 partition : newPartitions) {
-
-      Attribute.AttributeMap.DefaultAttributeMap partitionAttributeMap = new 
Attribute.AttributeMap.DefaultAttributeMap();
-      this.dir = "target/" + APP_ID + "/";
-      partitionAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
-      partitionAttributeMap.put(Context.DAGContext.APPLICATION_PATH, dir);
-
-      OperatorContextTestHelper.TestIdOperatorContext partitioningContext = 
new OperatorContextTestHelper.TestIdOperatorContext(
-          operatorId++, partitionAttributeMap);
-
-      partition.getPartitionedInstance().setup(partitioningContext);
-      partition.getPartitionedInstance().activate(partitioningContext);
-    }
-
-    //First partition is for range queries,last is for polling queries
-    AbstractJdbcPollInputOperator<Object> newInstance = 
itr.next().getPartitionedInstance();
-    CollectorTestSink<Object> sink1 = new CollectorTestSink<>();
-    newInstance.outputPort.setSink(sink1);
-    newInstance.beginWindow(1);
-    Thread.sleep(50);
-    newInstance.emitTuples();
-    newInstance.endWindow();
-
-    Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size());
-    int i = 0;
-    for (Object tuple : sink1.collectedTuples) {
-      String[] pojoEvent = tuple.toString().split(",");
-      Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? true : 
false);
-      i++;
-    }
-    sink1.collectedTuples.clear();
-
-    insertEventsInTable(10, 10);
-
-    AbstractJdbcPollInputOperator<Object> pollableInstance = 
itr.next().getPartitionedInstance();
-
-    pollableInstance.outputPort.setSink(sink1);
-
-    pollableInstance.beginWindow(1);
-    Thread.sleep(pollableInstance.getPollInterval());
-    pollableInstance.emitTuples();
-    pollableInstance.endWindow();
-    
-
-    Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size());
-    i = 10;
-    for (Object tuple : sink1.collectedTuples) {
-      String[] pojoEvent = tuple.toString().split(",");
-      Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? true : 
false);
-      i++;
-    }
-
-    sink1.collectedTuples.clear();
-    insertEventsInTable(10, 20);
-
-    pollableInstance.beginWindow(2);
-    Thread.sleep(pollableInstance.getPollInterval());
-    pollableInstance.emitTuples();
-    pollableInstance.endWindow();
-    
-    Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size());
-
-    i = 20;
-    for (Object tuple : sink1.collectedTuples) {
-      String[] pojoEvent = tuple.toString().split(",");
-      Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? true : 
false);
-      i++;
-    }
-    sink1.collectedTuples.clear();
-  }
-
-}

Reply via email to