Repository: apex-malhar
Updated Branches:
  refs/heads/master 42b9e2281 -> 3e7b76b8a


APEXMALHAR-2113 bug fix, app test case,updated test cases


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3e7b76b8
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3e7b76b8
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3e7b76b8

Branch: refs/heads/master
Commit: 3e7b76b8adde635ed1e8c3395cbff666616e87a6
Parents: 42b9e22
Author: devtagare <[email protected]>
Authored: Tue Jun 7 11:28:34 2016 -0700
Committer: devtagare <[email protected]>
Committed: Fri Jun 10 14:11:30 2016 -0700

----------------------------------------------------------------------
 ...stractJdbcTransactionableOutputOperator.java |   2 -
 .../com/datatorrent/lib/db/jdbc/JdbcIOApp.java  |  75 ++++++++
 .../datatorrent/lib/db/jdbc/JdbcIOAppTest.java  | 177 +++++++++++++++++++
 .../JdbcNonTransactionalOutputOperatorTest.java |   7 +-
 4 files changed, 254 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3e7b76b8/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
index 77b76c1..fb29233 100644
--- 
a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
@@ -23,7 +23,6 @@ import java.sql.SQLException;
 import java.util.List;
 
 import javax.validation.constraints.Min;
-import javax.validation.constraints.NotNull;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -140,7 +139,6 @@ public abstract class 
AbstractJdbcTransactionableOutputOperator<T>
    *
    * @return the sql statement to update a tuple in the database.
    */
-  @NotNull
   protected abstract String getUpdateCommand();
 
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3e7b76b8/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java 
b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java
new file mode 100644
index 0000000..4675cdb
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+
+@ApplicationAnnotation(name = "JdbcToJdbcApp")
+public class JdbcIOApp implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    JdbcPOJOInputOperator jdbcInputOperator = dag.addOperator("JdbcInput", new 
JdbcPOJOInputOperator());
+    JdbcStore store = new JdbcStore();
+    store.setDatabaseDriver("org.hsqldb.jdbcDriver");
+    store.setDatabaseUrl("jdbc:hsqldb:mem:test");
+    jdbcInputOperator.setStore(store);
+    jdbcInputOperator.setFieldInfos(addFieldInfos());
+    jdbcInputOperator.setFetchSize(10);
+    jdbcInputOperator.setTableName("test_app_event_table");
+    
dag.getMeta(jdbcInputOperator).getMeta(jdbcInputOperator.outputPort).getAttributes()
+        .put(Context.PortContext.TUPLE_CLASS, JdbcIOAppTest.PojoEvent.class);
+
+    JdbcPOJOOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", 
new JdbcPOJOOutputOperator());
+    JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
+    outputStore.setDatabaseDriver("org.hsqldb.jdbcDriver");
+    outputStore.setDatabaseUrl("jdbc:hsqldb:mem:test");
+    jdbcOutputOperator.setStore(outputStore);
+    jdbcOutputOperator.setFieldInfos(addFieldInfos());
+    jdbcOutputOperator.setTablename("test_app_output_event_table");
+    jdbcOutputOperator.setBatchSize(10);
+    
dag.getMeta(jdbcOutputOperator).getMeta(jdbcOutputOperator.input).getAttributes()
+        .put(Context.PortContext.TUPLE_CLASS, JdbcIOAppTest.PojoEvent.class);
+
+    dag.addStream("POJO's", jdbcInputOperator.outputPort, 
jdbcOutputOperator.input)
+        .setLocality(Locality.CONTAINER_LOCAL);
+  }
+
+  private List<FieldInfo> addFieldInfos()
+  {
+    List<FieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", 
SupportType.INTEGER));
+    fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING));
+    fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER));
+    return fieldInfos;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3e7b76b8/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java 
b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java
new file mode 100644
index 0000000..726595c
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * App test for {@link JdbcPOJOInputOperator and JdbcPOJOOutputOperator}
+ */
+public class JdbcIOAppTest
+{
+  public static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+  public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+
+  private static final String TABLE_NAME = "test_app_event_table";
+  private static final String OUTPUT_TABLE_NAME = 
"test_app_output_event_table";
+
+  @BeforeClass
+  public static void setup()
+  {
+    try {
+
+      Class.forName(DB_DRIVER).newInstance();
+
+      Connection con = DriverManager.getConnection(URL);
+      Statement stmt = con.createStatement();
+
+      String createMetaTable = "CREATE TABLE IF NOT EXISTS " + 
JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+          + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT 
NULL, "
+          + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+          + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + 
"UNIQUE ("
+          + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + 
JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", "
+          + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + ")";
+      stmt.executeUpdate(createMetaTable);
+
+      Class.forName(DB_DRIVER).newInstance();
+
+      String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+          + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)";
+      stmt.executeUpdate(createTable);
+      insertEventsInTable(10, 0);
+
+      String createOutputTable = "CREATE TABLE IF NOT EXISTS " + 
OUTPUT_TABLE_NAME
+          + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)";
+      stmt.executeUpdate(createOutputTable);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void insertEventsInTable(int numEvents, int offset)
+  {
+    try {
+      Connection con = DriverManager.getConnection(URL);
+      String insert = "insert into " + TABLE_NAME + " values (?,?,?)";
+      PreparedStatement stmt = con.prepareStatement(insert);
+      for (int i = 0; i < numEvents; i++, offset++) {
+        stmt.setInt(1, offset);
+        stmt.setString(2, "Account_Holder-" + offset);
+        stmt.setInt(3, (offset * 1000));
+        stmt.executeUpdate();
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public int getNumOfEventsInStore()
+  {
+    Connection con;
+    try {
+      con = DriverManager.getConnection(URL);
+      Statement stmt = con.createStatement();
+
+      String countQuery = "SELECT count(*) from " + OUTPUT_TABLE_NAME;
+      ResultSet resultSet = stmt.executeQuery(countQuery);
+      resultSet.next();
+      return resultSet.getInt(1);
+    } catch (SQLException e) {
+      throw new RuntimeException("fetching count", e);
+    }
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      lma.prepareDAG(new JdbcIOApp(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync();
+
+      // wait for records to be added to table    
+      Thread.sleep(3000);
+
+      Assert.assertEquals("Events in store", 10, getNumOfEventsInStore());
+
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+  public static class PojoEvent
+  {
+    @Override
+    public String toString()
+    {
+      return "PojoEvent [accountNumber=" + accountNumber + ", name=" + name + 
", amount=" + amount + "]";
+    }
+
+    private int accountNumber;
+    private String name;
+    private int amount;
+
+    public int getAccountNumber()
+    {
+      return accountNumber;
+    }
+
+    public void setAccountNumber(int accountNumber)
+    {
+      this.accountNumber = accountNumber;
+    }
+
+    public String getName()
+    {
+      return name;
+    }
+
+    public void setName(String name)
+    {
+      this.name = name;
+    }
+
+    public int getAmount()
+    {
+      return amount;
+    }
+
+    public void setAmount(int amount)
+    {
+      this.amount = amount;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3e7b76b8/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java
 
b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java
index 9880aae..3ad6c08 100644
--- 
a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java
+++ 
b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java
@@ -26,8 +26,6 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.List;
 
-import javax.annotation.Nonnull;
-
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -98,7 +96,6 @@ public class JdbcNonTransactionalOutputOperatorTest
       cleanTable();
     }
 
-    @Nonnull
     @Override
     protected String getUpdateCommand()
     {
@@ -141,7 +138,8 @@ public class JdbcNonTransactionalOutputOperatorTest
 
     com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap = new 
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
     attributeMap.put(DAG.APPLICATION_ID, APP_ID);
-    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
     outputOperator.setStore(store);
 
     outputOperator.setup(context);
@@ -160,4 +158,3 @@ public class JdbcNonTransactionalOutputOperatorTest
     Assert.assertEquals("rows in db", 10, 
outputOperator.getNumOfEventsInStore());
   }
 }
-

Reply via email to