Repository: incubator-apex-malhar Updated Branches: refs/heads/master 78c5fad19 -> ef12eb0cf
JdbcPOJOInputOperator polling fix Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/4fbc0387 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/4fbc0387 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/4fbc0387 Branch: refs/heads/master Commit: 4fbc0387d06e14977261590d295d720e67e83e7e Parents: 9c11400 Author: Sandeep Deshmukh <[email protected]> Authored: Wed May 25 19:37:32 2016 +0530 Committer: Sandeep Deshmukh <[email protected]> Committed: Wed May 25 19:37:32 2016 +0530 ---------------------------------------------------------------------- .../lib/db/jdbc/JdbcPOJOInputOperator.java | 8 ++---- .../lib/db/jdbc/JdbcOperatorTest.java | 29 ++++++++++++++++---- 2 files changed, 26 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4fbc0387/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java index db2d27a..2e0993f 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java @@ -95,8 +95,6 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> private transient PreparedStatement preparedStatement; protected transient Class<?> pojoClass; - protected int pageNumber; - @AutoMetric protected long tuplesRead; @@ -188,7 +186,6 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> public void beginWindow(long l) { windowDone = false; - tuplesRead = 0; } @Override @@ -209,7 +206,6 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> windowDone = true; } resultSet.close(); - pageNumber++; } catch (SQLException ex) { store.disconnect(); throw new RuntimeException(ex); @@ -220,9 +216,9 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> protected void setRuntimeParams() throws SQLException { if (mysqlSyntax) { - preparedStatement.setLong(1, pageNumber * fetchSize); + preparedStatement.setLong(1, tuplesRead); } else { - preparedStatement.setLong(1, pageNumber * fetchSize); + preparedStatement.setLong(1, tuplesRead); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4fbc0387/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java index 1fef903..6f2688f 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java @@ -411,7 +411,7 @@ public class JdbcOperatorTest OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( OPERATOR_ID, attributeMap); - cleanTableAndInsertEvents(10); + insertEvents(10,true, 0); JdbcPOJOInputOperator inputOperator = new JdbcPOJOInputOperator(); inputOperator.setStore(store); @@ -475,21 +475,40 @@ public class JdbcOperatorTest inputOperator.endWindow(); Assert.assertEquals("rows from db", 0, sink.collectedTuples.size()); + + // Insert 3 more tuples and check if they are read successfully. + insertEvents(3, false, 10); + + inputOperator.beginWindow(3); + inputOperator.emitTuples(); + inputOperator.endWindow(); + + Assert.assertEquals("rows from db", 3, sink.collectedTuples.size()); + for (Object tuple : sink.collectedTuples) { + TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple; + Assert.assertTrue("i=" + i, pojoEvent.getId() == i); + Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date); + Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time); + Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof Timestamp); + i++; + } } - private void cleanTableAndInsertEvents(int numEvents) + private void insertEvents(int numEvents, boolean cleanExistingRows, int startRowId) { try (Connection con = DriverManager.getConnection(URL); Statement stmt = con.createStatement()) { - String cleanTable = "delete from " + TABLE_POJO_NAME; - stmt.executeUpdate(cleanTable); + if (cleanExistingRows) { + String cleanTable = "delete from " + TABLE_POJO_NAME; + stmt.executeUpdate(cleanTable); + } String insert = "insert into " + TABLE_POJO_NAME + " values (?,?,?,?,?)"; PreparedStatement pStmt = con.prepareStatement(insert); con.prepareStatement(insert); for (int i = 0; i < numEvents; i++) { - pStmt.setInt(1, i); + pStmt.setInt(1, startRowId + i); pStmt.setString(2, "name"); pStmt.setDate(3, new Date(2016, 1, 1)); pStmt.setTime(4, new Time(2016, 1, 1));
