Repository: apex-malhar Updated Branches: refs/heads/master 2b775968a -> 5ae58d039
APEXMALHAR-2291 Fix for Exactly-once processing of JdbcPOJOInsertOutput Operator. Added a check in endWindow() to not to commit if committed window id is greater than current window id. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5ae58d03 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5ae58d03 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5ae58d03 Branch: refs/heads/master Commit: 5ae58d039dfba2b9debcc8e42f4f328e34d5bdc2 Parents: 2b77596 Author: Hitesh-Scorpio <[email protected]> Authored: Fri Oct 14 20:45:38 2016 +0530 Committer: Hitesh-Scorpio <[email protected]> Committed: Wed Oct 26 17:11:58 2016 +0530 ---------------------------------------------------------------------- ...sThruTransactionableStoreOutputOperator.java | 8 +- .../lib/db/jdbc/JdbcPojoOperatorTest.java | 92 ++++++++++++++++++++ 2 files changed, 97 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5ae58d03/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java index b471a63..d21bd01 100644 --- a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java @@ -49,9 +49,11 @@ public abstract class AbstractPassThruTransactionableStoreOutputOperator<T, S ex @Override public void endWindow() { - store.storeCommittedWindowId(appId, operatorId, currentWindowId); - store.commitTransaction(); - committedWindowId = currentWindowId; + if ( committedWindowId < currentWindowId ) { + store.storeCommittedWindowId(appId, operatorId, currentWindowId); + store.commitTransaction(); + committedWindowId = currentWindowId; + } super.endWindow(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5ae58d03/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java index e6d8b42..91cb2f2 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java @@ -335,6 +335,98 @@ public class JdbcPojoOperatorTest extends JdbcOperatorTest } /** + * This test will assume direct mapping for POJO fields to DB columns All + * fields in DB present in POJO and will test it for exactly once criteria + */ + @Test + public void testJdbcPojoInsertOutputOperatorExactlyOnce() + { + JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); + transactionalStore.setDatabaseDriver(DB_DRIVER); + transactionalStore.setDatabaseUrl(URL); + + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator(); + outputOperator.setBatchSize(3); + outputOperator.setTablename(TABLE_POJO_NAME); + + outputOperator.setStore(transactionalStore); + + outputOperator.setup(context); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + outputOperator.input.setup(tpc); + + CollectorTestSink<Object> errorSink = new CollectorTestSink<>(); + TestUtils.setSink(outputOperator.error, errorSink); + + outputOperator.activate(context); + + List<TestPOJOEvent> events = Lists.newArrayList(); + for (int i = 0; i < 70; i++) { + events.add(new TestPOJOEvent(i, "test" + i)); + } + + outputOperator.beginWindow(0); + for (int i = 0; i < 10; i++) { + outputOperator.input.process(events.get(i)); + } + outputOperator.endWindow(); + + outputOperator.beginWindow(1); + for (int i = 10; i < 20; i++) { + outputOperator.input.process(events.get(i)); + } + outputOperator.endWindow(); + + outputOperator.beginWindow(2); + for (int i = 20; i < 30; i++) { + outputOperator.input.process(events.get(i)); + } + outputOperator.endWindow(); + + outputOperator.setup(context); + outputOperator.input.setup(tpc); + outputOperator.activate(context); + + outputOperator.beginWindow(0); + for (int i = 30; i < 40; i++) { + outputOperator.input.process(events.get(i)); + } + outputOperator.endWindow(); + + outputOperator.beginWindow(1); + for (int i = 40; i < 50; i++) { + outputOperator.input.process(events.get(i)); + } + outputOperator.endWindow(); + + outputOperator.beginWindow(2); + for (int i = 50; i < 60; i++) { + outputOperator.input.process(events.get(i)); + } + + outputOperator.beginWindow(3); + for (int i = 60; i < 70; i++) { + outputOperator.input.process(events.get(i)); + } + outputOperator.endWindow(); + + outputOperator.deactivate(); + outputOperator.teardown(); + + Assert.assertEquals("rows in db", 40, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME)); + + } + + + /** * This test will assume direct mapping for POJO fields to DB columns Nullable * DB field missing in POJO name1 field, which is nullable in DB is missing * from POJO POJO(id, name) -> DB(id, name1)
