Repository: incubator-apex-malhar Updated Branches: refs/heads/master debf3c0cc -> 2f307965c
APEXMALHAR-2075 enhance jdbcinputpojooperator to support date/time/timestamp fields in POJO Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/138a9f59 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/138a9f59 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/138a9f59 Branch: refs/heads/master Commit: 138a9f59e441c989b6a6e5252479d248facf5bfb Parents: 2459f6c Author: shubham <[email protected]> Authored: Tue May 3 17:03:24 2016 +0530 Committer: shubham <[email protected]> Committed: Fri May 13 18:20:51 2016 +0530 ---------------------------------------------------------------------- .../lib/db/jdbc/JdbcPOJOInputOperator.java | 18 ++--- .../lib/db/jdbc/JdbcOperatorTest.java | 79 ++++++++++++++++++-- 2 files changed, 83 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/138a9f59/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java index a6183c2..db2d27a 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java @@ -292,17 +292,17 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> case Types.TIMESTAMP: Timestamp tsVal = result.getTimestamp(i + 1); - ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(obj, tsVal.getTime()); + ((PojoUtils.Setter<Object, Timestamp>)afi.setterOrGetter).set(obj, tsVal); break; case Types.TIME: Time timeVal = result.getTime(i + 1); - ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(obj, timeVal.getTime()); + ((PojoUtils.Setter<Object, Time>)afi.setterOrGetter).set(obj, timeVal); break; case Types.DATE: Date dateVal = result.getDate(i + 1); - ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(obj, dateVal.getTime()); + ((PojoUtils.Setter<Object, Date>)afi.setterOrGetter).set(obj, dateVal); break; default: @@ -415,18 +415,18 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> break; case Types.TIMESTAMP: - activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, - activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression(),Timestamp.class); break; case Types.TIME: - activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, - activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression(),Time.class); break; case Types.DATE: - activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, - activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression(), Date.class); break; default: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/138a9f59/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java index 1202511..1fef903 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java @@ -19,11 +19,14 @@ package com.datatorrent.lib.db.jdbc; import java.sql.Connection; +import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; import java.util.List; import javax.annotation.Nonnull; @@ -70,6 +73,9 @@ public class JdbcOperatorTest { private int id; private String name; + private Date startDate; + private Time startTime; + private Timestamp startTimestamp; public TestPOJOEvent() { @@ -101,6 +107,36 @@ public class JdbcOperatorTest this.name = name; } + public Date getStartDate() + { + return startDate; + } + + public void setStartDate(Date startDate) + { + this.startDate = startDate; + } + + public Time getStartTime() + { + return startTime; + } + + public void setStartTime(Time startTime) + { + this.startTime = startTime; + } + + public Timestamp getStartTimestamp() + { + return startTimestamp; + } + + public void setStartTimestamp(Timestamp startTimestamp) + { + this.startTimestamp = startTimestamp; + } + } @BeforeClass @@ -124,7 +160,7 @@ public class JdbcOperatorTest String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID INTEGER)"; stmt.executeUpdate(createTable); String createPOJOTable = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME - + "(id INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id ))"; + + "(id INTEGER not NULL,name VARCHAR(255),startDate DATE,startTime TIME,startTimestamp TIMESTAMP, PRIMARY KEY ( id ))"; stmt.executeUpdate(createPOJOTable); } catch (Throwable e) { DTThrowable.rethrow(e); @@ -374,15 +410,18 @@ public class JdbcOperatorTest attributeMap.put(DAG.APPLICATION_ID, APP_ID); OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( OPERATOR_ID, attributeMap); - - insertEventsInTable(10); + + cleanTableAndInsertEvents(10); JdbcPOJOInputOperator inputOperator = new JdbcPOJOInputOperator(); inputOperator.setStore(store); - inputOperator.setTableName(TABLE_NAME); + inputOperator.setTableName(TABLE_POJO_NAME); List<FieldInfo> fieldInfos = Lists.newArrayList(); fieldInfos.add(new FieldInfo("ID", "id", null)); + fieldInfos.add(new FieldInfo("STARTDATE", "startDate", null)); + fieldInfos.add(new FieldInfo("STARTTIME", "startTime", null)); + fieldInfos.add(new FieldInfo("STARTTIMESTAMP", "startTimestamp", null)); inputOperator.setFieldInfos(fieldInfos); inputOperator.setFetchSize(5); @@ -408,6 +447,9 @@ public class JdbcOperatorTest for (Object tuple : sink.collectedTuples) { TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple; Assert.assertTrue("i=" + i, pojoEvent.getId() == i); + Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date); + Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time); + Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof Timestamp); i++; } sink.collectedTuples.clear(); @@ -420,6 +462,9 @@ public class JdbcOperatorTest for (Object tuple : sink.collectedTuples) { TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple; Assert.assertTrue("i=" + i, pojoEvent.getId() == i); + Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date); + Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time); + Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof Timestamp); i++; } @@ -431,5 +476,29 @@ public class JdbcOperatorTest Assert.assertEquals("rows from db", 0, sink.collectedTuples.size()); } -} + + + private void cleanTableAndInsertEvents(int numEvents) + { + try (Connection con = DriverManager.getConnection(URL); Statement stmt = con.createStatement()) { + String cleanTable = "delete from " + TABLE_POJO_NAME; + stmt.executeUpdate(cleanTable); + + String insert = "insert into " + TABLE_POJO_NAME + " values (?,?,?,?,?)"; + PreparedStatement pStmt = con.prepareStatement(insert); + con.prepareStatement(insert); + for (int i = 0; i < numEvents; i++) { + pStmt.setInt(1, i); + pStmt.setString(2, "name"); + pStmt.setDate(3, new Date(2016, 1, 1)); + pStmt.setTime(4, new Time(2016, 1, 1)); + pStmt.setTimestamp(5, new Timestamp(2016, 1, 1, 0, 0, 0, 0)); + pStmt.executeUpdate(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + +}
