Repository: apex-malhar Updated Branches: refs/heads/master 3316d6a78 -> 26fa9d781
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java index 6c7e7d4..1ffe256 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java @@ -22,48 +22,28 @@ import java.sql.Connection; import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; -import java.sql.Types; -import java.util.List; -import javax.annotation.Nonnull; - -import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Test; - -import com.google.common.collect.Lists; - -import com.datatorrent.api.Attribute; -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.lib.helper.TestPortContext; -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.FieldInfo; -import com.datatorrent.lib.util.TestUtils; + import com.datatorrent.netlet.util.DTThrowable; -/** - * Tests for {@link AbstractJdbcTransactionableOutputOperator} and {@link AbstractJdbcInputOperator} - */ public class JdbcOperatorTest { public static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; - private static final String TABLE_NAME = "test_event_table"; - private static final String TABLE_POJO_NAME = "test_pojo_event_table"; - private static final String TABLE_POJO_NAME_ID_DIFF = "test_pojo_event_table_id_diff"; - private static final String TABLE_POJO_NAME_NAME_DIFF = "test_pojo_event_table_name_diff"; - private static String APP_ID = "JdbcOperatorTest"; - private static int OPERATOR_ID = 0; + protected static final String TABLE_NAME = "test_event_table"; + protected static final String TABLE_POJO_NAME = "test_pojo_event_table"; + protected static final String TABLE_POJO_NAME_ID_DIFF = "test_pojo_event_table_id_diff"; + protected static final String TABLE_POJO_NAME_NAME_DIFF = "test_pojo_event_table_name_diff"; + protected static String APP_ID = "JdbcOperatorTest"; + protected static int OPERATOR_ID = 0; - private static class TestEvent + public static class TestEvent { int id; @@ -151,6 +131,15 @@ public class JdbcOperatorTest { this.score = score; } + + @Override + public String toString() + { + return "TestPOJOEvent [id=" + id + ", name=" + name + ", startDate=" + startDate + ", startTime=" + startTime + + ", startTimestamp=" + startTimestamp + ", score=" + score + "]"; + } + + } @BeforeClass @@ -162,25 +151,18 @@ public class JdbcOperatorTest Connection con = DriverManager.getConnection(URL); Statement stmt = con.createStatement(); - String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " - + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " - + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " - + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " - + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " - + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " - + ")"; + String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + ")"; stmt.executeUpdate(createMetaTable); String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID INTEGER)"; stmt.executeUpdate(createTable); String createPOJOTable = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME + "(id INTEGER not NULL,name VARCHAR(255),startDate DATE,startTime TIME,startTimestamp TIMESTAMP, score DOUBLE, PRIMARY KEY ( id ))"; + stmt.executeUpdate(createPOJOTable); - String createPOJOTableIdDiff = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME_ID_DIFF - + "(id1 INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id1 ))"; + String createPOJOTableIdDiff = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME_ID_DIFF + "(id1 INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id1 ))"; stmt.executeUpdate(createPOJOTableIdDiff); - String createPOJOTableNameDiff = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME_NAME_DIFF - + "(id INTEGER not NULL,name1 VARCHAR(255), PRIMARY KEY ( id ))"; + String createPOJOTableNameDiff = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME_NAME_DIFF + "(id INTEGER not NULL,name1 VARCHAR(255), PRIMARY KEY ( id ))"; stmt.executeUpdate(createPOJOTableNameDiff); } catch (Throwable e) { DTThrowable.rethrow(e); @@ -222,569 +204,7 @@ public class JdbcOperatorTest } } - private static class TestOutputOperator extends AbstractJdbcTransactionableOutputOperator<TestEvent> - { - private static final String INSERT_STMT = "INSERT INTO " + TABLE_NAME + " values (?)"; - - TestOutputOperator() - { - cleanTable(); - } - - @Nonnull - @Override - protected String getUpdateCommand() - { - return INSERT_STMT; - } - - @Override - protected void setStatementParameters(PreparedStatement statement, TestEvent tuple) throws SQLException - { - statement.setInt(1, tuple.id); - } - - public int getNumOfEventsInStore() - { - Connection con; - try { - con = DriverManager.getConnection(URL); - Statement stmt = con.createStatement(); - - String countQuery = "SELECT count(*) from " + TABLE_NAME; - ResultSet resultSet = stmt.executeQuery(countQuery); - resultSet.next(); - return resultSet.getInt(1); - } catch (SQLException e) { - throw new RuntimeException("fetching count", e); - } - } - } - - private static class TestPOJOOutputOperator extends JdbcPOJOInsertOutputOperator - { - TestPOJOOutputOperator() - { - cleanTable(); - } - - public int getNumOfEventsInStore(String tableName) - { - Connection con; - try { - con = DriverManager.getConnection(URL); - Statement stmt = con.createStatement(); - - String countQuery = "SELECT count(*) from " + tableName; - ResultSet resultSet = stmt.executeQuery(countQuery); - resultSet.next(); - return resultSet.getInt(1); - } catch (SQLException e) { - throw new RuntimeException("fetching count", e); - } - } - - public int getNumOfNullEventsInStore(String tableName) - { - Connection con; - try { - con = DriverManager.getConnection(URL); - Statement stmt = con.createStatement(); - - String countQuery = "SELECT count(*) from " + tableName + " where name1 is null"; - ResultSet resultSet = stmt.executeQuery(countQuery); - resultSet.next(); - return resultSet.getInt(1); - } catch (SQLException e) { - throw new RuntimeException("fetching count", e); - } - } - - } - - private static class TestPOJONonInsertOutputOperator extends JdbcPOJONonInsertOutputOperator - { - public TestPOJONonInsertOutputOperator() - { - cleanTable(); - } - - public int getNumOfEventsInStore() - { - Connection con; - try { - con = DriverManager.getConnection(URL); - Statement stmt = con.createStatement(); - - String countQuery = "SELECT count(*) from " + TABLE_POJO_NAME; - ResultSet resultSet = stmt.executeQuery(countQuery); - resultSet.next(); - return resultSet.getInt(1); - } catch (SQLException e) { - throw new RuntimeException("fetching count", e); - } - } - - public int getDistinctNonUnique() - { - Connection con; - try { - con = DriverManager.getConnection(URL); - Statement stmt = con.createStatement(); - - String countQuery = "SELECT count(distinct(name)) from " + TABLE_POJO_NAME; - ResultSet resultSet = stmt.executeQuery(countQuery); - resultSet.next(); - return resultSet.getInt(1); - } catch (SQLException e) { - throw new RuntimeException("fetching count", e); - } - } - } - - private static class TestInputOperator extends AbstractJdbcInputOperator<TestEvent> - { - - private static final String retrieveQuery = "SELECT * FROM " + TABLE_NAME; - - TestInputOperator() - { - cleanTable(); - } - - @Override - public TestEvent getTuple(ResultSet result) - { - try { - return new TestEvent(result.getInt(1)); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - @Override - public String queryToRetrieveData() - { - return retrieveQuery; - } - } - - @Test - public void testJdbcOutputOperator() - { - JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); - transactionalStore.setDatabaseDriver(DB_DRIVER); - transactionalStore.setDatabaseUrl(URL); - - com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = - new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); - attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( - OPERATOR_ID, attributeMap); - - TestOutputOperator outputOperator = new TestOutputOperator(); - outputOperator.setBatchSize(3); - outputOperator.setStore(transactionalStore); - - outputOperator.setup(context); - - outputOperator.activate(context); - List<TestEvent> events = Lists.newArrayList(); - for (int i = 0; i < 10; i++) { - events.add(new TestEvent(i)); - } - - outputOperator.beginWindow(0); - for (TestEvent event : events) { - outputOperator.input.process(event); - } - outputOperator.endWindow(); - - Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore()); - cleanTable(); - } - - @Test - public void testJdbcPojoOutputOperator() - { - JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); - transactionalStore.setDatabaseDriver(DB_DRIVER); - transactionalStore.setDatabaseUrl(URL); - - com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = - new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); - attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( - OPERATOR_ID, attributeMap); - - TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator(); - outputOperator.setBatchSize(3); - outputOperator.setTablename(TABLE_POJO_NAME); - - List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); - fieldInfos.add(new JdbcFieldInfo("ID", "id", null, Types.INTEGER)); - fieldInfos.add(new JdbcFieldInfo("NAME", "name", null, Types.VARCHAR)); - outputOperator.setFieldInfos(fieldInfos); - - outputOperator.setStore(transactionalStore); - - outputOperator.setup(context); - - Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); - portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); - TestPortContext tpc = new TestPortContext(portAttributes); - outputOperator.input.setup(tpc); - - outputOperator.activate(context); - - List<TestPOJOEvent> events = Lists.newArrayList(); - for (int i = 0; i < 10; i++) { - events.add(new TestPOJOEvent(i, "test" + i)); - } - - outputOperator.beginWindow(0); - for (TestPOJOEvent event : events) { - outputOperator.input.process(event); - } - outputOperator.endWindow(); - - Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME)); - } - - /** - * This test will assume direct mapping for POJO fields to DB columns - * All fields in DB present in POJO - */ - @Test - public void testJdbcPojoInsertOutputOperator() - { - JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); - transactionalStore.setDatabaseDriver(DB_DRIVER); - transactionalStore.setDatabaseUrl(URL); - - com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = - new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); - attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( - OPERATOR_ID, attributeMap); - - TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator(); - outputOperator.setBatchSize(3); - outputOperator.setTablename(TABLE_POJO_NAME); - - outputOperator.setStore(transactionalStore); - - outputOperator.setup(context); - - Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); - portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); - TestPortContext tpc = new TestPortContext(portAttributes); - outputOperator.input.setup(tpc); - - CollectorTestSink<Object> errorSink = new CollectorTestSink<>(); - TestUtils.setSink(outputOperator.error, errorSink); - - outputOperator.activate(context); - - List<TestPOJOEvent> events = Lists.newArrayList(); - for (int i = 0; i < 10; i++) { - events.add(new TestPOJOEvent(i, "test" + i)); - } - events.add(new TestPOJOEvent(0, "test0")); // Records violating PK constraint - events.add(new TestPOJOEvent(2, "test2")); // Records violating PK constraint - events.add(new TestPOJOEvent(10, "test10")); // Clean record - events.add(new TestPOJOEvent(11, "test11")); // Clean record - events.add(new TestPOJOEvent(3, "test3")); // Records violating PK constraint - events.add(new TestPOJOEvent(12, "test12")); // Clean record - - outputOperator.beginWindow(0); - for (TestPOJOEvent event : events) { - outputOperator.input.process(event); - } - outputOperator.endWindow(); - - Assert.assertEquals("rows in db", 13, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME)); - Assert.assertEquals("Error tuples", 3, errorSink.collectedTuples.size()); - } - - /** - * This test will assume direct mapping for POJO fields to DB columns - * Nullable DB field missing in POJO - * name1 field, which is nullable in DB is missing from POJO - * POJO(id, name) -> DB(id, name1) - */ - @Test - public void testJdbcPojoInsertOutputOperatorNullName() - { - JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); - transactionalStore.setDatabaseDriver(DB_DRIVER); - transactionalStore.setDatabaseUrl(URL); - - com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = - new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); - attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( - OPERATOR_ID, attributeMap); - - TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator(); - outputOperator.setBatchSize(3); - outputOperator.setTablename(TABLE_POJO_NAME_NAME_DIFF); - - outputOperator.setStore(transactionalStore); - - outputOperator.setup(context); - - Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); - portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); - TestPortContext tpc = new TestPortContext(portAttributes); - outputOperator.input.setup(tpc); - - outputOperator.activate(context); - - List<TestPOJOEvent> events = Lists.newArrayList(); - for (int i = 0; i < 10; i++) { - events.add(new TestPOJOEvent(i, "test" + i)); - } - - outputOperator.beginWindow(0); - for (TestPOJOEvent event : events) { - outputOperator.input.process(event); - } - outputOperator.endWindow(); - - Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME_NAME_DIFF)); - Assert.assertEquals("null name rows in db", 10, - outputOperator.getNumOfNullEventsInStore(TABLE_POJO_NAME_NAME_DIFF)); - } - - /** - * This test will assume direct mapping for POJO fields to DB columns. - * Non-Nullable DB field missing in POJO - * id1 field which is non-nullable in DB is missing from POJO - * POJO(id, name) -> DB(id1, name) - */ - @Test - public void testJdbcPojoInsertOutputOperatorNullId() - { - JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); - transactionalStore.setDatabaseDriver(DB_DRIVER); - transactionalStore.setDatabaseUrl(URL); - - com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = - new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); - attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( - OPERATOR_ID, attributeMap); - - TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator(); - outputOperator.setBatchSize(3); - outputOperator.setTablename(TABLE_POJO_NAME_ID_DIFF); - - outputOperator.setStore(transactionalStore); - - outputOperator.setup(context); - - Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); - portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); - TestPortContext tpc = new TestPortContext(portAttributes); - outputOperator.input.setup(tpc); - - boolean exceptionOccurred = false; - try { - outputOperator.activate(context); - } catch (Exception e) { - exceptionOccurred = true; - Assert.assertTrue(e instanceof RuntimeException); - Assert.assertTrue(e.getMessage().toLowerCase().contains("id1 not found in pojo")); - } - Assert.assertTrue(exceptionOccurred); - } - - @Test - public void testJdbcPojoOutputOperatorMerge() - { - JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); - transactionalStore.setDatabaseDriver(DB_DRIVER); - transactionalStore.setDatabaseUrl(URL); - - com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = - new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); - attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( - OPERATOR_ID, attributeMap); - - TestPOJONonInsertOutputOperator updateOperator = new TestPOJONonInsertOutputOperator(); - updateOperator.setBatchSize(3); - - updateOperator.setStore(transactionalStore); - - updateOperator.setSqlStatement("MERGE INTO " + TABLE_POJO_NAME + " AS T USING (VALUES (?, ?)) AS FOO(id, name) " - + "ON T.id = FOO.id " - + "WHEN MATCHED THEN UPDATE SET name = FOO.name " - + "WHEN NOT MATCHED THEN INSERT( id, name ) VALUES (FOO.id, FOO.name);"); - - List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); - fieldInfos.add(new JdbcFieldInfo("id", "id", null, Types.INTEGER)); - fieldInfos.add(new JdbcFieldInfo("name", "name", null, Types.VARCHAR)); - updateOperator.setFieldInfos(fieldInfos); - updateOperator.setup(context); - - Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); - portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); - TestPortContext tpc = new TestPortContext(portAttributes); - updateOperator.input.setup(tpc); - - updateOperator.activate(context); - - List<TestPOJOEvent> events = Lists.newArrayList(); - for (int i = 0; i < 10; i++) { - events.add(new TestPOJOEvent(i, "test" + i)); - } - for (int i = 0; i < 5; i++) { - events.add(new TestPOJOEvent(i, "test" + 100)); - } - - updateOperator.getDistinctNonUnique(); - updateOperator.beginWindow(0); - for (TestPOJOEvent event : events) { - updateOperator.input.process(event); - } - updateOperator.endWindow(); - - // Expect 10 unique ids: 0 - 9 - Assert.assertEquals("rows in db", 10, updateOperator.getNumOfEventsInStore()); - // Expect 6 unique name: test-100, test-5, test-6, test-7, test-8, test-9 - Assert.assertEquals("rows in db", 6, updateOperator.getDistinctNonUnique()); - } - - @Test - public void testJdbcInputOperator() - { - JdbcStore store = new JdbcStore(); - store.setDatabaseDriver(DB_DRIVER); - store.setDatabaseUrl(URL); - - com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = - new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); - attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( - OPERATOR_ID, attributeMap); - - TestInputOperator inputOperator = new TestInputOperator(); - inputOperator.setStore(store); - insertEventsInTable(10); - - CollectorTestSink<Object> sink = new CollectorTestSink<>(); - inputOperator.outputPort.setSink(sink); - - inputOperator.setup(context); - inputOperator.beginWindow(0); - inputOperator.emitTuples(); - inputOperator.endWindow(); - - Assert.assertEquals("rows from db", 10, sink.collectedTuples.size()); - } - - @Test - public void testJdbcPojoInputOperator() - { - JdbcStore store = new JdbcStore(); - store.setDatabaseDriver(DB_DRIVER); - store.setDatabaseUrl(URL); - - Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); - attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( - OPERATOR_ID, attributeMap); - - insertEvents(10,true, 0); - - JdbcPOJOInputOperator inputOperator = new JdbcPOJOInputOperator(); - inputOperator.setStore(store); - inputOperator.setTableName(TABLE_POJO_NAME); - - List<FieldInfo> fieldInfos = Lists.newArrayList(); - fieldInfos.add(new FieldInfo("ID", "id", null)); - fieldInfos.add(new FieldInfo("STARTDATE", "startDate", null)); - fieldInfos.add(new FieldInfo("STARTTIME", "startTime", null)); - fieldInfos.add(new FieldInfo("STARTTIMESTAMP", "startTimestamp", null)); - fieldInfos.add(new FieldInfo("SCORE", "score", FieldInfo.SupportType.DOUBLE)); - inputOperator.setFieldInfos(fieldInfos); - - inputOperator.setFetchSize(5); - - CollectorTestSink<Object> sink = new CollectorTestSink<>(); - inputOperator.outputPort.setSink(sink); - - Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); - portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); - TestPortContext tpc = new TestPortContext(portAttributes); - - inputOperator.setup(context); - inputOperator.outputPort.setup(tpc); - - inputOperator.activate(context); - - inputOperator.beginWindow(0); - inputOperator.emitTuples(); - inputOperator.endWindow(); - - Assert.assertEquals("rows from db", 5, sink.collectedTuples.size()); - int i = 0; - for (Object tuple : sink.collectedTuples) { - TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple; - Assert.assertTrue("i=" + i, pojoEvent.getId() == i); - Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date); - Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time); - Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof Timestamp); - i++; - } - sink.collectedTuples.clear(); - - inputOperator.beginWindow(1); - inputOperator.emitTuples(); - inputOperator.endWindow(); - - Assert.assertEquals("rows from db", 5, sink.collectedTuples.size()); - for (Object tuple : sink.collectedTuples) { - TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple; - Assert.assertTrue("i=" + i, pojoEvent.getId() == i); - Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date); - Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time); - Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof Timestamp); - Assert.assertTrue("score", pojoEvent.getScore() == 55.4); - i++; - } - - sink.collectedTuples.clear(); - - inputOperator.beginWindow(2); - inputOperator.emitTuples(); - inputOperator.endWindow(); - - Assert.assertEquals("rows from db", 0, sink.collectedTuples.size()); - - // Insert 3 more tuples and check if they are read successfully. - insertEvents(3, false, 10); - - inputOperator.beginWindow(3); - inputOperator.emitTuples(); - inputOperator.endWindow(); - - Assert.assertEquals("rows from db", 3, sink.collectedTuples.size()); - for (Object tuple : sink.collectedTuples) { - TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple; - Assert.assertTrue("i=" + i, pojoEvent.getId() == i); - Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date); - Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time); - Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof Timestamp); - Assert.assertTrue("score", pojoEvent.getScore() == 55.4); - i++; - } - } - - - private void insertEvents(int numEvents, boolean cleanExistingRows, int startRowId) + protected static void insertEvents(int numEvents, boolean cleanExistingRows, int startRowId) { try (Connection con = DriverManager.getConnection(URL); Statement stmt = con.createStatement()) { if (cleanExistingRows) { @@ -798,13 +218,14 @@ public class JdbcOperatorTest for (int i = 0; i < numEvents; i++) { pStmt.setInt(1, startRowId + i); - pStmt.setString(2, "name"); + pStmt.setString(2, "name" + i); pStmt.setDate(3, new Date(2016, 1, 1)); pStmt.setTime(4, new Time(2016, 1, 1)); pStmt.setTimestamp(5, new Timestamp(2016, 1, 1, 0, 0, 0, 0)); pStmt.setDouble(6, new Double(55.4)); pStmt.executeUpdate(); } + } catch (SQLException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java new file mode 100644 index 0000000..e6d8b42 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java @@ -0,0 +1,606 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.List; + +import javax.annotation.Nonnull; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.helper.TestPortContext; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.TestUtils; + +/** + * Tests for {@link AbstractJdbcTransactionableOutputOperator} and + * {@link AbstractJdbcInputOperator} + */ +public class JdbcPojoOperatorTest extends JdbcOperatorTest +{ + + private static class TestOutputOperator extends AbstractJdbcTransactionableOutputOperator<TestEvent> + { + private static final String INSERT_STMT = "INSERT INTO " + TABLE_NAME + " values (?)"; + + TestOutputOperator() + { + cleanTable(); + } + + @Nonnull + @Override + protected String getUpdateCommand() + { + return INSERT_STMT; + } + + @Override + protected void setStatementParameters(PreparedStatement statement, TestEvent tuple) throws SQLException + { + statement.setInt(1, tuple.id); + } + + public int getNumOfEventsInStore() + { + Connection con; + try { + con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(*) from " + TABLE_NAME; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + } + + private static class TestInputOperator extends AbstractJdbcInputOperator<TestEvent> + { + + private static final String retrieveQuery = "SELECT * FROM " + TABLE_NAME; + + TestInputOperator() + { + cleanTable(); + } + + @Override + public TestEvent getTuple(ResultSet result) + { + try { + return new TestEvent(result.getInt(1)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public String queryToRetrieveData() + { + return retrieveQuery; + } + } + + private static class TestPOJOOutputOperator extends JdbcPOJOInsertOutputOperator + { + TestPOJOOutputOperator() + { + cleanTable(); + } + + public int getNumOfEventsInStore(String tableName) + { + Connection con; + try { + con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(*) from " + tableName; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + public int getNumOfNullEventsInStore(String tableName) + { + Connection con; + try { + con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(*) from " + tableName + " where name1 is null"; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + private static class TestPOJONonInsertOutputOperator extends JdbcPOJONonInsertOutputOperator + { + public TestPOJONonInsertOutputOperator() + { + cleanTable(); + } + + public int getNumOfEventsInStore() + { + Connection con; + try { + con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(*) from " + TABLE_POJO_NAME; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + public int getDistinctNonUnique() + { + Connection con; + try { + con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(distinct(name)) from " + TABLE_POJO_NAME; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + } + } + + @Test + public void testJdbcOutputOperator() + { + JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); + transactionalStore.setDatabaseDriver(DB_DRIVER); + transactionalStore.setDatabaseUrl(URL); + + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + TestOutputOperator outputOperator = new TestOutputOperator(); + outputOperator.setBatchSize(3); + outputOperator.setStore(transactionalStore); + + outputOperator.setup(context); + + outputOperator.activate(context); + List<TestEvent> events = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + events.add(new TestEvent(i)); + } + + outputOperator.beginWindow(0); + for (TestEvent event : events) { + outputOperator.input.process(event); + } + outputOperator.endWindow(); + + Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore()); + cleanTable(); + } + + @Test + public void testJdbcPojoOutputOperator() + { + JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); + transactionalStore.setDatabaseDriver(DB_DRIVER); + transactionalStore.setDatabaseUrl(URL); + + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator(); + outputOperator.setBatchSize(3); + outputOperator.setTablename(TABLE_POJO_NAME); + + List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new JdbcFieldInfo("ID", "id", null, Types.INTEGER)); + fieldInfos.add(new JdbcFieldInfo("NAME", "name", null, Types.VARCHAR)); + outputOperator.setFieldInfos(fieldInfos); + + outputOperator.setStore(transactionalStore); + + outputOperator.setup(context); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + outputOperator.input.setup(tpc); + + outputOperator.activate(context); + + List<TestPOJOEvent> events = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + events.add(new TestPOJOEvent(i, "test" + i)); + } + + outputOperator.beginWindow(0); + for (TestPOJOEvent event : events) { + outputOperator.input.process(event); + } + outputOperator.endWindow(); + + Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME)); + } + + /** + * This test will assume direct mapping for POJO fields to DB columns All + * fields in DB present in POJO + */ + @Test + public void testJdbcPojoInsertOutputOperator() + { + JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); + transactionalStore.setDatabaseDriver(DB_DRIVER); + transactionalStore.setDatabaseUrl(URL); + + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator(); + outputOperator.setBatchSize(3); + outputOperator.setTablename(TABLE_POJO_NAME); + + outputOperator.setStore(transactionalStore); + + outputOperator.setup(context); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + outputOperator.input.setup(tpc); + + CollectorTestSink<Object> errorSink = new CollectorTestSink<>(); + TestUtils.setSink(outputOperator.error, errorSink); + + outputOperator.activate(context); + + List<TestPOJOEvent> events = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + events.add(new TestPOJOEvent(i, "test" + i)); + } + events.add(new TestPOJOEvent(0, "test0")); // Records violating PK constraint + events.add(new TestPOJOEvent(2, "test2")); // Records violating PK constraint + events.add(new TestPOJOEvent(10, "test10")); // Clean record + events.add(new TestPOJOEvent(11, "test11")); // Clean record + events.add(new TestPOJOEvent(3, "test3")); // Records violating PK constraint + events.add(new TestPOJOEvent(12, "test12")); // Clean record + + outputOperator.beginWindow(0); + for (TestPOJOEvent event : events) { + outputOperator.input.process(event); + } + outputOperator.endWindow(); + + Assert.assertEquals("rows in db", 13, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME)); + Assert.assertEquals("Error tuples", 3, errorSink.collectedTuples.size()); + } + + /** + * This test will assume direct mapping for POJO fields to DB columns Nullable + * DB field missing in POJO name1 field, which is nullable in DB is missing + * from POJO POJO(id, name) -> DB(id, name1) + */ + @Test + public void testJdbcPojoInsertOutputOperatorNullName() + { + JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); + transactionalStore.setDatabaseDriver(DB_DRIVER); + transactionalStore.setDatabaseUrl(URL); + + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator(); + outputOperator.setBatchSize(3); + outputOperator.setTablename(TABLE_POJO_NAME_NAME_DIFF); + + outputOperator.setStore(transactionalStore); + + outputOperator.setup(context); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + outputOperator.input.setup(tpc); + + outputOperator.activate(context); + + List<TestPOJOEvent> events = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + events.add(new TestPOJOEvent(i, "test" + i)); + } + + outputOperator.beginWindow(0); + for (TestPOJOEvent event : events) { + outputOperator.input.process(event); + } + outputOperator.endWindow(); + + Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME_NAME_DIFF)); + Assert + .assertEquals("null name rows in db", 10, outputOperator.getNumOfNullEventsInStore(TABLE_POJO_NAME_NAME_DIFF)); + } + + /** + * This test will assume direct mapping for POJO fields to DB columns. + * Non-Nullable DB field missing in POJO id1 field which is non-nullable in DB + * is missing from POJO POJO(id, name) -> DB(id1, name) + */ + @Test + public void testJdbcPojoInsertOutputOperatorNullId() + { + JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); + transactionalStore.setDatabaseDriver(DB_DRIVER); + transactionalStore.setDatabaseUrl(URL); + + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator(); + outputOperator.setBatchSize(3); + outputOperator.setTablename(TABLE_POJO_NAME_ID_DIFF); + + outputOperator.setStore(transactionalStore); + + outputOperator.setup(context); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + outputOperator.input.setup(tpc); + + boolean exceptionOccurred = false; + try { + outputOperator.activate(context); + } catch (Exception e) { + exceptionOccurred = true; + Assert.assertTrue(e instanceof RuntimeException); + Assert.assertTrue(e.getMessage().toLowerCase().contains("id1 not found in pojo")); + } + Assert.assertTrue(exceptionOccurred); + } + + @Test + public void testJdbcPojoOutputOperatorMerge() + { + JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); + transactionalStore.setDatabaseDriver(DB_DRIVER); + transactionalStore.setDatabaseUrl(URL); + + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + TestPOJOOutputOperator.TestPOJONonInsertOutputOperator updateOperator = new TestPOJOOutputOperator.TestPOJONonInsertOutputOperator(); + updateOperator.setBatchSize(3); + + updateOperator.setStore(transactionalStore); + + updateOperator.setSqlStatement("MERGE INTO " + TABLE_POJO_NAME + " AS T USING (VALUES (?, ?)) AS FOO(id, name) " + + "ON T.id = FOO.id " + "WHEN MATCHED THEN UPDATE SET name = FOO.name " + + "WHEN NOT MATCHED THEN INSERT( id, name ) VALUES (FOO.id, FOO.name);"); + + List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new JdbcFieldInfo("id", "id", null, Types.INTEGER)); + fieldInfos.add(new JdbcFieldInfo("name", "name", null, Types.VARCHAR)); + updateOperator.setFieldInfos(fieldInfos); + updateOperator.setup(context); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + updateOperator.input.setup(tpc); + + updateOperator.activate(context); + + List<TestPOJOEvent> events = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + events.add(new TestPOJOEvent(i, "test" + i)); + } + for (int i = 0; i < 5; i++) { + events.add(new TestPOJOEvent(i, "test" + 100)); + } + + updateOperator.getDistinctNonUnique(); + updateOperator.beginWindow(0); + for (TestPOJOEvent event : events) { + updateOperator.input.process(event); + } + updateOperator.endWindow(); + + // Expect 10 unique ids: 0 - 9 + Assert.assertEquals("rows in db", 10, updateOperator.getNumOfEventsInStore()); + // Expect 6 unique name: test-100, test-5, test-6, test-7, test-8, test-9 + Assert.assertEquals("rows in db", 6, updateOperator.getDistinctNonUnique()); + } + + @Test + public void testJdbcInputOperator() + { + JdbcStore store = new JdbcStore(); + store.setDatabaseDriver(DB_DRIVER); + store.setDatabaseUrl(URL); + + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + TestInputOperator inputOperator = new TestInputOperator(); + inputOperator.setStore(store); + insertEventsInTable(10); + + CollectorTestSink<Object> sink = new CollectorTestSink<>(); + inputOperator.outputPort.setSink(sink); + + inputOperator.setup(context); + inputOperator.beginWindow(0); + inputOperator.emitTuples(); + inputOperator.endWindow(); + + Assert.assertEquals("rows from db", 10, sink.collectedTuples.size()); + } + + @Test + public void testJdbcPojoInputOperator() + { + JdbcStore store = new JdbcStore(); + store.setDatabaseDriver(DB_DRIVER); + store.setDatabaseUrl(URL); + + Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + insertEvents(10, true, 0); + + JdbcPOJOInputOperator inputOperator = new JdbcPOJOInputOperator(); + inputOperator.setStore(store); + inputOperator.setTableName(TABLE_POJO_NAME); + + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("ID", "id", null)); + fieldInfos.add(new FieldInfo("STARTDATE", "startDate", null)); + fieldInfos.add(new FieldInfo("STARTTIME", "startTime", null)); + fieldInfos.add(new FieldInfo("STARTTIMESTAMP", "startTimestamp", null)); + fieldInfos.add(new FieldInfo("SCORE", "score", FieldInfo.SupportType.DOUBLE)); + inputOperator.setFieldInfos(fieldInfos); + + inputOperator.setFetchSize(5); + + CollectorTestSink<Object> sink = new CollectorTestSink<>(); + inputOperator.outputPort.setSink(sink); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + + inputOperator.setup(context); + inputOperator.outputPort.setup(tpc); + + inputOperator.activate(context); + + inputOperator.beginWindow(0); + inputOperator.emitTuples(); + inputOperator.endWindow(); + + Assert.assertEquals("rows from db", 5, sink.collectedTuples.size()); + int i = 0; + for (Object tuple : sink.collectedTuples) { + TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple; + Assert.assertTrue("i=" + i, pojoEvent.getId() == i); + Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date); + Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time); + Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof Timestamp); + i++; + } + sink.collectedTuples.clear(); + + inputOperator.beginWindow(1); + inputOperator.emitTuples(); + inputOperator.endWindow(); + + Assert.assertEquals("rows from db", 5, sink.collectedTuples.size()); + for (Object tuple : sink.collectedTuples) { + TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple; + Assert.assertTrue("i=" + i, pojoEvent.getId() == i); + Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date); + Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time); + Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof Timestamp); + Assert.assertTrue("score", pojoEvent.getScore() == 55.4); + i++; + } + + sink.collectedTuples.clear(); + + inputOperator.beginWindow(2); + inputOperator.emitTuples(); + inputOperator.endWindow(); + + Assert.assertEquals("rows from db", 0, sink.collectedTuples.size()); + + // Insert 3 more tuples and check if they are read successfully. + insertEvents(3, false, 10); + + inputOperator.beginWindow(3); + inputOperator.emitTuples(); + inputOperator.endWindow(); + + Assert.assertEquals("rows from db", 3, sink.collectedTuples.size()); + for (Object tuple : sink.collectedTuples) { + TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple; + Assert.assertTrue("i=" + i, pojoEvent.getId() == i); + Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date); + Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time); + Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof Timestamp); + Assert.assertTrue("score", pojoEvent.getScore() == 55.4); + i++; + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java new file mode 100644 index 0000000..2f3f356 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.io.File; +import java.io.IOException; +import java.sql.Date; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.MutablePair; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Partitioner; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.helper.TestPortContext; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.KeyValPair; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest +{ + public String dir = null; + @Mock + private ScheduledExecutorService mockscheduler; + @Mock + private ScheduledFuture futureTaskMock; + @Mock + private WindowDataManager windowDataManagerMock; + + @Before + public void beforeTest() + { + dir = "target/" + APP_ID + "/"; + + MockitoAnnotations.initMocks(this); + when(mockscheduler.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))) + .thenReturn(futureTaskMock); + } + + @After + public void afterTest() throws IOException + { + cleanTable(); + FileUtils.deleteDirectory(new File(dir)); + } + + @Test + public void testDBPoller() throws InterruptedException + { + insertEvents(10, true, 0); + + JdbcStore store = new JdbcStore(); + store.setDatabaseDriver(DB_DRIVER); + store.setDatabaseUrl(URL); + + List<FieldInfo> fieldInfos = getFieldInfos(); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + + JdbcPOJOPollInputOperator inputOperator = new JdbcPOJOPollInputOperator(); + inputOperator.setStore(store); + inputOperator.setTableName(TABLE_POJO_NAME); + inputOperator.setKey("id"); + inputOperator.setFieldInfos(fieldInfos); + inputOperator.setFetchSize(100); + inputOperator.setBatchSize(100); + inputOperator.setPartitionCount(2); + + Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> newPartitions = inputOperator + .definePartitions(new ArrayList<Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>(), null); + + int operatorId = 0; + for (com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>> partition : newPartitions) { + + Attribute.AttributeMap.DefaultAttributeMap partitionAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + partitionAttributeMap.put(DAG.APPLICATION_ID, APP_ID); + partitionAttributeMap.put(Context.DAGContext.APPLICATION_PATH, dir); + + OperatorContextTestHelper.TestIdOperatorContext partitioningContext = new OperatorContextTestHelper.TestIdOperatorContext( + operatorId++, partitionAttributeMap); + + JdbcPOJOPollInputOperator parition = (JdbcPOJOPollInputOperator)partition.getPartitionedInstance(); + parition.outputPort.setup(tpc); + parition.setScheduledExecutorService(mockscheduler); + parition.setup(partitioningContext); + parition.activate(partitioningContext); + } + + Iterator<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> itr = newPartitions + .iterator(); + // First partition is for range queries,last is for polling queries + JdbcPOJOPollInputOperator firstInstance = (JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance(); + CollectorTestSink<Object> sink1 = new CollectorTestSink<>(); + firstInstance.outputPort.setSink(sink1); + firstInstance.beginWindow(0); + firstInstance.pollRecords(); + firstInstance.pollRecords(); + firstInstance.emitTuples(); + firstInstance.endWindow(); + + Assert.assertEquals("rows from db", 5, sink1.collectedTuples.size()); + for (Object tuple : sink1.collectedTuples) { + TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple; + Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date); + Assert.assertTrue("date", pojoEvent.getId() < 5); + } + + JdbcPOJOPollInputOperator secondInstance = (JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance(); + CollectorTestSink<Object> sink2 = new CollectorTestSink<>(); + secondInstance.outputPort.setSink(sink2); + secondInstance.beginWindow(0); + secondInstance.pollRecords(); + secondInstance.emitTuples(); + secondInstance.endWindow(); + + Assert.assertEquals("rows from db", 5, sink2.collectedTuples.size()); + for (Object tuple : sink2.collectedTuples) { + TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple; + Assert.assertTrue("date", pojoEvent.getId() < 10 && pojoEvent.getId() >= 5); + } + + insertEvents(4, false, 10); + JdbcPOJOPollInputOperator thirdInstance = (JdbcPOJOPollInputOperator)itr.next().getPartitionedInstance(); + CollectorTestSink<Object> sink3 = new CollectorTestSink<>(); + thirdInstance.outputPort.setSink(sink3); + thirdInstance.beginWindow(0); + thirdInstance.pollRecords(); + thirdInstance.emitTuples(); + thirdInstance.endWindow(); + + Assert.assertEquals("rows from db", 4, sink3.collectedTuples.size()); + } + + @Test + public void testRecovery() throws IOException + { + int operatorId = 1; + when(windowDataManagerMock.getLargestRecoveryWindow()).thenReturn(1L); + when(windowDataManagerMock.load(operatorId, 1)).thenReturn(new MutablePair<Integer, Integer>(0, 4)); + + insertEvents(10, true, 0); + + JdbcStore store = new JdbcStore(); + store.setDatabaseDriver(DB_DRIVER); + store.setDatabaseUrl(URL); + + List<FieldInfo> fieldInfos = getFieldInfos(); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + + Attribute.AttributeMap.DefaultAttributeMap partitionAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + partitionAttributeMap.put(DAG.APPLICATION_ID, APP_ID); + partitionAttributeMap.put(Context.DAGContext.APPLICATION_PATH, dir); + + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + operatorId, partitionAttributeMap); + + JdbcPOJOPollInputOperator inputOperator = new JdbcPOJOPollInputOperator(); + inputOperator.setStore(store); + inputOperator.setTableName(TABLE_POJO_NAME); + inputOperator.setKey("id"); + inputOperator.setFieldInfos(fieldInfos); + inputOperator.setFetchSize(100); + inputOperator.setBatchSize(100); + inputOperator.lastEmittedRow = 0; //setting as not calling partition logic + inputOperator.rangeQueryPair = new KeyValPair<Integer, Integer>(0, 8); + + inputOperator.outputPort.setup(tpc); + inputOperator.setScheduledExecutorService(mockscheduler); + inputOperator.setup(context); + inputOperator.setWindowManager(windowDataManagerMock); + inputOperator.activate(context); + + CollectorTestSink<Object> sink = new CollectorTestSink<>(); + inputOperator.outputPort.setSink(sink); + inputOperator.beginWindow(0); + verify(mockscheduler, times(0)).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); + inputOperator.emitTuples(); + inputOperator.endWindow(); + inputOperator.beginWindow(1); + verify(mockscheduler, times(1)).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); + + } + + private List<FieldInfo> getFieldInfos() + { + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("ID", "id", null)); + fieldInfos.add(new FieldInfo("STARTDATE", "startDate", null)); + fieldInfos.add(new FieldInfo("STARTTIME", "startTime", null)); + fieldInfos.add(new FieldInfo("STARTTIMESTAMP", "startTimestamp", null)); + fieldInfos.add(new FieldInfo("NAME", "name", null)); + return fieldInfos; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/26fa9d78/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java deleted file mode 100644 index 573e45d..0000000 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java +++ /dev/null @@ -1,246 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.db.jdbc; - -import java.io.File; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.commons.io.FileUtils; - -import com.google.common.collect.Lists; - -import com.datatorrent.api.Attribute; -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DefaultPartition; -import com.datatorrent.api.Partitioner; -import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; - -/** - * Tests for {@link AbstractJdbcPollInputOperator} and - * {@link JdbcPollInputOperator} - */ -public class JdbcPollerTest -{ - public static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; - public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; - - private static final String TABLE_NAME = "test_account_table"; - private static String APP_ID = "JdbcPollingOperatorTest"; - public String dir = null; - - @BeforeClass - public static void setup() - { - try { - cleanup(); - } catch (Exception e) { - throw new RuntimeException(e); - } - try { - Class.forName(DB_DRIVER).newInstance(); - - Connection con = DriverManager.getConnection(URL); - Statement stmt = con.createStatement(); - - String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME - + " (Account_No INTEGER, Name VARCHAR(255), Amount INTEGER)"; - stmt.executeUpdate(createTable); - - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @AfterClass - public static void cleanup() - { - try { - FileUtils.deleteDirectory(new File("target/" + APP_ID)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static void cleanTable() - { - try { - Connection con = DriverManager.getConnection(URL); - Statement stmt = con.createStatement(); - String cleanTable = "delete from " + TABLE_NAME; - stmt.executeUpdate(cleanTable); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public static void insertEventsInTable(int numEvents, int offset) - { - try { - Connection con = DriverManager.getConnection(URL); - String insert = "insert into " + TABLE_NAME + " values (?,?,?)"; - PreparedStatement stmt = con.prepareStatement(insert); - for (int i = 0; i < numEvents; i++, offset++) { - stmt.setInt(1, offset); - stmt.setString(2, "Account_Holder-" + offset); - stmt.setInt(3, (offset * 1000)); - stmt.executeUpdate(); - } - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - /** - * Simulates actual application flow Adds a batch query partitiom, a pollable - * partition Incremental record polling is also checked - */ - @Test - public void testJdbcPollingInputOperatorBatch() throws InterruptedException - { - cleanTable(); - insertEventsInTable(10, 0); - JdbcStore store = new JdbcStore(); - store.setDatabaseDriver(DB_DRIVER); - store.setDatabaseUrl(URL); - - Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); - this.dir = "target/" + APP_ID + "/"; - attributeMap.put(DAG.APPLICATION_ID, APP_ID); - attributeMap.put(Context.DAGContext.APPLICATION_PATH, dir); - - JdbcPollInputOperator inputOperator = new JdbcPollInputOperator(); - inputOperator.setStore(store); - inputOperator.setBatchSize(100); - inputOperator.setPollInterval(1000); - inputOperator.setEmitColumnList("Account_No,Name,Amount"); - inputOperator.setKey("Account_No"); - inputOperator.setTableName(TABLE_NAME); - inputOperator.setFetchSize(100); - inputOperator.setPartitionCount(1); - - CollectorTestSink<Object> sink = new CollectorTestSink<>(); - inputOperator.outputPort.setSink(sink); - - TestUtils.MockBatchedOperatorStats readerStats = new TestUtils.MockBatchedOperatorStats(2); - - DefaultPartition<AbstractJdbcPollInputOperator<Object>> apartition = new DefaultPartition<AbstractJdbcPollInputOperator<Object>>( - inputOperator); - - TestUtils.MockPartition<AbstractJdbcPollInputOperator<Object>> pseudoParttion = new TestUtils.MockPartition<>( - apartition, readerStats); - - List<Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> newMocks = Lists.newArrayList(); - - newMocks.add(pseudoParttion); - - Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> newPartitions = inputOperator - .definePartitions(newMocks, null); - - Iterator<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> itr = newPartitions - .iterator(); - - int operatorId = 0; - for (com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>> partition : newPartitions) { - - Attribute.AttributeMap.DefaultAttributeMap partitionAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); - this.dir = "target/" + APP_ID + "/"; - partitionAttributeMap.put(DAG.APPLICATION_ID, APP_ID); - partitionAttributeMap.put(Context.DAGContext.APPLICATION_PATH, dir); - - OperatorContextTestHelper.TestIdOperatorContext partitioningContext = new OperatorContextTestHelper.TestIdOperatorContext( - operatorId++, partitionAttributeMap); - - partition.getPartitionedInstance().setup(partitioningContext); - partition.getPartitionedInstance().activate(partitioningContext); - } - - //First partition is for range queries,last is for polling queries - AbstractJdbcPollInputOperator<Object> newInstance = itr.next().getPartitionedInstance(); - CollectorTestSink<Object> sink1 = new CollectorTestSink<>(); - newInstance.outputPort.setSink(sink1); - newInstance.beginWindow(1); - Thread.sleep(50); - newInstance.emitTuples(); - newInstance.endWindow(); - - Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size()); - int i = 0; - for (Object tuple : sink1.collectedTuples) { - String[] pojoEvent = tuple.toString().split(","); - Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? true : false); - i++; - } - sink1.collectedTuples.clear(); - - insertEventsInTable(10, 10); - - AbstractJdbcPollInputOperator<Object> pollableInstance = itr.next().getPartitionedInstance(); - - pollableInstance.outputPort.setSink(sink1); - - pollableInstance.beginWindow(1); - Thread.sleep(pollableInstance.getPollInterval()); - pollableInstance.emitTuples(); - pollableInstance.endWindow(); - - - Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size()); - i = 10; - for (Object tuple : sink1.collectedTuples) { - String[] pojoEvent = tuple.toString().split(","); - Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? true : false); - i++; - } - - sink1.collectedTuples.clear(); - insertEventsInTable(10, 20); - - pollableInstance.beginWindow(2); - Thread.sleep(pollableInstance.getPollInterval()); - pollableInstance.emitTuples(); - pollableInstance.endWindow(); - - Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size()); - - i = 20; - for (Object tuple : sink1.collectedTuples) { - String[] pojoEvent = tuple.toString().split(","); - Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? true : false); - i++; - } - sink1.collectedTuples.clear(); - } - -}
