Repository: apex-malhar Updated Branches: refs/heads/master 42b9e2281 -> 3e7b76b8a
APEXMALHAR-2113 bug fix, app test case,updated test cases Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3e7b76b8 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3e7b76b8 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3e7b76b8 Branch: refs/heads/master Commit: 3e7b76b8adde635ed1e8c3395cbff666616e87a6 Parents: 42b9e22 Author: devtagare <[email protected]> Authored: Tue Jun 7 11:28:34 2016 -0700 Committer: devtagare <[email protected]> Committed: Fri Jun 10 14:11:30 2016 -0700 ---------------------------------------------------------------------- ...stractJdbcTransactionableOutputOperator.java | 2 - .../com/datatorrent/lib/db/jdbc/JdbcIOApp.java | 75 ++++++++ .../datatorrent/lib/db/jdbc/JdbcIOAppTest.java | 177 +++++++++++++++++++ .../JdbcNonTransactionalOutputOperatorTest.java | 7 +- 4 files changed, 254 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3e7b76b8/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java index 77b76c1..fb29233 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java @@ -23,7 +23,6 @@ import java.sql.SQLException; import java.util.List; import javax.validation.constraints.Min; -import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,7 +139,6 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> * * @return the sql statement to update a tuple in the database. */ - @NotNull protected abstract String getUpdateCommand(); /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3e7b76b8/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java new file mode 100644 index 0000000..4675cdb --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.FieldInfo.SupportType; + +@ApplicationAnnotation(name = "JdbcToJdbcApp") +public class JdbcIOApp implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + JdbcPOJOInputOperator jdbcInputOperator = dag.addOperator("JdbcInput", new JdbcPOJOInputOperator()); + JdbcStore store = new JdbcStore(); + store.setDatabaseDriver("org.hsqldb.jdbcDriver"); + store.setDatabaseUrl("jdbc:hsqldb:mem:test"); + jdbcInputOperator.setStore(store); + jdbcInputOperator.setFieldInfos(addFieldInfos()); + jdbcInputOperator.setFetchSize(10); + jdbcInputOperator.setTableName("test_app_event_table"); + dag.getMeta(jdbcInputOperator).getMeta(jdbcInputOperator.outputPort).getAttributes() + .put(Context.PortContext.TUPLE_CLASS, JdbcIOAppTest.PojoEvent.class); + + JdbcPOJOOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", new JdbcPOJOOutputOperator()); + JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); + outputStore.setDatabaseDriver("org.hsqldb.jdbcDriver"); + outputStore.setDatabaseUrl("jdbc:hsqldb:mem:test"); + jdbcOutputOperator.setStore(outputStore); + jdbcOutputOperator.setFieldInfos(addFieldInfos()); + jdbcOutputOperator.setTablename("test_app_output_event_table"); + jdbcOutputOperator.setBatchSize(10); + dag.getMeta(jdbcOutputOperator).getMeta(jdbcOutputOperator.input).getAttributes() + .put(Context.PortContext.TUPLE_CLASS, JdbcIOAppTest.PojoEvent.class); + + dag.addStream("POJO's", jdbcInputOperator.outputPort, jdbcOutputOperator.input) + .setLocality(Locality.CONTAINER_LOCAL); + } + + private List<FieldInfo> addFieldInfos() + { + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING)); + fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER)); + return fieldInfos; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3e7b76b8/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java new file mode 100644 index 0000000..726595c --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * App test for {@link JdbcPOJOInputOperator and JdbcPOJOOutputOperator} + */ +public class JdbcIOAppTest +{ + public static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; + public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; + + private static final String TABLE_NAME = "test_app_event_table"; + private static final String OUTPUT_TABLE_NAME = "test_app_output_event_table"; + + @BeforeClass + public static void setup() + { + try { + + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " + + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " + + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + "UNIQUE (" + + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + ")"; + stmt.executeUpdate(createMetaTable); + + Class.forName(DB_DRIVER).newInstance(); + + String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)"; + stmt.executeUpdate(createTable); + insertEventsInTable(10, 0); + + String createOutputTable = "CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE_NAME + + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)"; + stmt.executeUpdate(createOutputTable); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void insertEventsInTable(int numEvents, int offset) + { + try { + Connection con = DriverManager.getConnection(URL); + String insert = "insert into " + TABLE_NAME + " values (?,?,?)"; + PreparedStatement stmt = con.prepareStatement(insert); + for (int i = 0; i < numEvents; i++, offset++) { + stmt.setInt(1, offset); + stmt.setString(2, "Account_Holder-" + offset); + stmt.setInt(3, (offset * 1000)); + stmt.executeUpdate(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public int getNumOfEventsInStore() + { + Connection con; + try { + con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(*) from " + OUTPUT_TABLE_NAME; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + @Test + public void testApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new JdbcIOApp(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // wait for records to be added to table + Thread.sleep(3000); + + Assert.assertEquals("Events in store", 10, getNumOfEventsInStore()); + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + public static class PojoEvent + { + @Override + public String toString() + { + return "PojoEvent [accountNumber=" + accountNumber + ", name=" + name + ", amount=" + amount + "]"; + } + + private int accountNumber; + private String name; + private int amount; + + public int getAccountNumber() + { + return accountNumber; + } + + public void setAccountNumber(int accountNumber) + { + this.accountNumber = accountNumber; + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public int getAmount() + { + return amount; + } + + public void setAmount(int amount) + { + this.amount = amount; + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3e7b76b8/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java index 9880aae..3ad6c08 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java @@ -26,8 +26,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.List; -import javax.annotation.Nonnull; - import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -98,7 +96,6 @@ public class JdbcNonTransactionalOutputOperatorTest cleanTable(); } - @Nonnull @Override protected String getUpdateCommand() { @@ -141,7 +138,8 @@ public class JdbcNonTransactionalOutputOperatorTest com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); outputOperator.setStore(store); outputOperator.setup(context); @@ -160,4 +158,3 @@ public class JdbcNonTransactionalOutputOperatorTest Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore()); } } -
