Repository: apex-malhar Updated Branches: refs/heads/master 13883da68 -> ddd5bcf1a
APEXMALHAR-1953: Added JdbcPOJOInsertOutputOperator for insert queries. Added support for automatic mapping of fields from POJO to DB columns. Added unit tests. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f54ba320 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f54ba320 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f54ba320 Branch: refs/heads/master Commit: f54ba3205d4177f63363d5f00e9a3548e5f89a96 Parents: f6ba2d0 Author: bhupesh <[email protected]> Authored: Tue Mar 15 18:58:30 2016 +0530 Committer: bhupesh <[email protected]> Committed: Fri Jul 1 11:53:02 2016 +0530 ---------------------------------------------------------------------- .../db/jdbc/AbstractJdbcPOJOOutputOperator.java | 79 +------- ...stractJdbcTransactionableOutputOperator.java | 15 +- .../db/jdbc/JdbcPOJOInsertOutputOperator.java | 182 +++++++++++++++++++ .../lib/db/jdbc/JdbcOperatorTest.java | 127 ++++++++++++- 4 files changed, 324 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f54ba320/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java index da491aa..c310a40 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java @@ -21,10 +21,7 @@ package com.datatorrent.lib.db.jdbc; import java.math.BigDecimal; import java.sql.Date; import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; import java.sql.Types; @@ -40,7 +37,6 @@ import com.google.common.collect.Lists; import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.Operator; import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.lib.util.FieldInfo; import com.datatorrent.lib.util.PojoUtils; @@ -63,22 +59,18 @@ import com.datatorrent.lib.util.PojoUtils.GetterShort; * @since 2.1.0 */ @org.apache.hadoop.classification.InterfaceStability.Evolving -public class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object> - implements Operator.ActivationListener<OperatorContext> +public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object> { - @NotNull private List<FieldInfo> fieldInfos; - private List<Integer> columnDataTypes; + protected List<Integer> columnDataTypes; @NotNull private String tablename; - private final transient List<JdbcPOJOInputOperator.ActiveFieldInfo> columnFieldGetters; - - private String insertStatement; + protected final transient List<JdbcPOJOInputOperator.ActiveFieldInfo> columnFieldGetters; - private transient Class<?> pojoClass; + protected transient Class<?> pojoClass; @InputPortFieldAnnotation(optional = true, schemaRequired = true) public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() @@ -97,57 +89,6 @@ public class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableO }; - @Override - public void setup(OperatorContext context) - { - StringBuilder columns = new StringBuilder(); - StringBuilder values = new StringBuilder(); - for (int i = 0; i < fieldInfos.size(); i++) { - columns.append(fieldInfos.get(i).getColumnName()); - values.append("?"); - if (i < fieldInfos.size() - 1) { - columns.append(","); - values.append(","); - } - } - insertStatement = "INSERT INTO " - + tablename - + " (" + columns.toString() + ")" - + " VALUES (" + values.toString() + ")"; - LOG.debug("insert statement is {}", insertStatement); - - super.setup(context); - - if (columnDataTypes == null) { - try { - populateColumnDataTypes(columns.toString()); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - for (FieldInfo fi : fieldInfos) { - columnFieldGetters.add(new JdbcPOJOInputOperator.ActiveFieldInfo(fi)); - } - } - - protected void populateColumnDataTypes(String columns) throws SQLException - { - columnDataTypes = Lists.newArrayList(); - try (Statement st = store.getConnection().createStatement()) { - ResultSet rs = st.executeQuery("select " + columns + " from " + tablename); - - ResultSetMetaData rsMetaData = rs.getMetaData(); - LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount()); - - for (int i = 1; i <= rsMetaData.getColumnCount(); i++) { - int type = rsMetaData.getColumnType(i); - columnDataTypes.add(type); - LOG.debug("column name {} type {}", rsMetaData.getColumnName(i), type); - } - } - } - public AbstractJdbcPOJOOutputOperator() { super(); @@ -155,13 +96,6 @@ public class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableO } @Override - protected String getUpdateCommand() - { - LOG.debug("insert statement is {}", insertStatement); - return insertStatement; - } - - @Override @SuppressWarnings("unchecked") protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException { @@ -271,6 +205,7 @@ public class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableO @Override public void activate(OperatorContext context) { + super.activate(context); final int size = columnDataTypes.size(); for (int i = 0; i < size; i++) { final int type = columnDataTypes.get(i); @@ -345,8 +280,4 @@ public class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableO } } - @Override - public void deactivate() - { - } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f54ba320/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java index fb29233..d3300fc 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java @@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Operator; import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator; /** @@ -56,6 +58,7 @@ import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator */ public abstract class AbstractJdbcTransactionableOutputOperator<T> extends AbstractPassThruTransactionableStoreOutputOperator<T, JdbcTransactionalStore> + implements Operator.ActivationListener<Context.OperatorContext> { protected static int DEFAULT_BATCH_SIZE = 1000; @@ -78,12 +81,17 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> public void setup(Context.OperatorContext context) { super.setup(context); + + } + + @Override + public void activate(OperatorContext context) + { try { updateCommand = store.connection.prepareStatement(getUpdateCommand()); } catch (SQLException e) { throw new RuntimeException(e); } - } @Override @@ -98,6 +106,11 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> } @Override + public void deactivate() + { + } + + @Override public void processTuple(T tuple) { tuples.add(tuple); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f54ba320/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java new file mode 100644 index 0000000..09bab2f --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.lang.reflect.Field; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.lib.util.FieldInfo; + +/** + * <p> + * JdbcPOJOInsertOutputOperator class.</p> + * An implementation of AbstractJdbcTransactionableOutputOperator which takes in any POJO. + * + * @displayName Jdbc Output Operator + * @category Output + * @tags database, sql, pojo, jdbc + * @since 2.1.0 + */ [email protected] +public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator +{ + String insertStatement; + List<String> columnNames; + List<Integer> columnNullabilities; + String columnString; + String valueString; + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + + // Populate columnNames and columnDataTypes + try { + if (getFieldInfos() == null) { // then assume direct mapping + LOG.info("Assuming direct mapping between POJO fields and DB columns"); + populateColumnDataTypes(null); + } else { + // FieldInfo supplied by user + StringBuilder columns = new StringBuilder(); + StringBuilder values = new StringBuilder(); + for (int i = 0; i < getFieldInfos().size(); i++) { + columns.append(getFieldInfos().get(i).getColumnName()); + values.append("?"); + if (i < getFieldInfos().size() - 1) { + columns.append(","); + values.append(","); + } + } + populateColumnDataTypes(columns.toString()); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public void activate(OperatorContext context) + { + if(getFieldInfos() == null) { + Field[] fields = pojoClass.getDeclaredFields(); + // Create fieldInfos in case of direct mapping + List<FieldInfo> fieldInfos = Lists.newArrayList(); + for (int i = 0; i < columnNames.size(); i++) { + String columnName = columnNames.get(i); + String pojoField = getMatchingField(fields, columnName); + + if(columnNullabilities.get(i) == ResultSetMetaData.columnNoNulls && + (pojoField == null || pojoField.length() == 0)) { + throw new RuntimeException("Data for a non-nullable field not found in POJO"); + } else { + if(pojoField != null && pojoField.length() != 0) { + FieldInfo fi = new FieldInfo(columnName, pojoField, null); + fieldInfos.add(fi); + } else { + columnDataTypes.remove(i); + columnNames.remove(i); + columnNullabilities.remove(i); + i--; + } + } + } + setFieldInfos(fieldInfos); + } + + for (FieldInfo fi : getFieldInfos()) { + columnFieldGetters.add(new JdbcPOJOInputOperator.ActiveFieldInfo(fi)); + } + + StringBuilder columns = new StringBuilder(); + StringBuilder values = new StringBuilder(); + + for (int i = 0; i < columnNames.size(); i++) { + columns.append(columnNames.get(i)); + values.append("?"); + if (i < columnNames.size() - 1) { + columns.append(","); + values.append(","); + } + } + + insertStatement = "INSERT INTO " + + getTablename() + + " (" + columns.toString() + ")" + + " VALUES (" + values.toString() + ")"; + LOG.debug("insert statement is {}", insertStatement); + + super.activate(context); + } + + private String getMatchingField(Field[] fields, String columnName) + { + for (Field f: fields) { + if(f.getName().equalsIgnoreCase(columnName)) { + return f.getName(); + } + } + return null; + } + + protected void populateColumnDataTypes(String columns) throws SQLException + { + columnNames = Lists.newArrayList(); + columnDataTypes = Lists.newArrayList(); + columnNullabilities = Lists.newArrayList(); + + try (Statement st = store.getConnection().createStatement()) { + if (columns == null || columns.length() == 0) { + columns = "*"; + } + ResultSet rs = st.executeQuery("select " + columns + " from " + getTablename()); + + ResultSetMetaData rsMetaData = rs.getMetaData(); + LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount()); + + for (int i = 1; i <= rsMetaData.getColumnCount(); i++) { + int type = rsMetaData.getColumnType(i); + String columnName = rsMetaData.getColumnName(i); + columnNames.add(columnName); + columnDataTypes.add(type); + columnNullabilities.add(rsMetaData.isNullable(i)); + LOG.debug("column name {} type {}", rsMetaData.getColumnName(i), type); + } + } + } + + + @Override + protected String getUpdateCommand() + { + return insertStatement; + } + + private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOInsertOutputOperator.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f54ba320/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java index ad7e676..26196f5 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java @@ -56,6 +56,8 @@ public class JdbcOperatorTest private static final String TABLE_NAME = "test_event_table"; private static final String TABLE_POJO_NAME = "test_pojo_event_table"; + private static final String TABLE_POJO_NAME_ID_DIFF = "test_pojo_event_table_id_diff"; + private static final String TABLE_POJO_NAME_NAME_DIFF = "test_pojo_event_table_name_diff"; private static String APP_ID = "JdbcOperatorTest"; private static int OPERATOR_ID = 0; @@ -162,6 +164,12 @@ public class JdbcOperatorTest String createPOJOTable = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME + "(id INTEGER not NULL,name VARCHAR(255),startDate DATE,startTime TIME,startTimestamp TIMESTAMP, PRIMARY KEY ( id ))"; stmt.executeUpdate(createPOJOTable); + String createPOJOTableIdDiff = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME_ID_DIFF + + "(id1 INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id1 ))"; + stmt.executeUpdate(createPOJOTableIdDiff); + String createPOJOTableNameDiff = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME_NAME_DIFF + + "(id INTEGER not NULL,name1 VARCHAR(255), PRIMARY KEY ( id ))"; + stmt.executeUpdate(createPOJOTableNameDiff); } catch (Throwable e) { DTThrowable.rethrow(e); } @@ -176,6 +184,9 @@ public class JdbcOperatorTest String cleanTable = "delete from " + TABLE_NAME; stmt.executeUpdate(cleanTable); + cleanTable = "delete from " + TABLE_POJO_NAME; + stmt.executeUpdate(cleanTable); + cleanTable = "delete from " + JdbcTransactionalStore.DEFAULT_META_TABLE; stmt.executeUpdate(cleanTable); } catch (SQLException e) { @@ -238,21 +249,37 @@ public class JdbcOperatorTest } } - private static class TestPOJOOutputOperator extends AbstractJdbcPOJOOutputOperator + private static class TestPOJOOutputOperator extends JdbcPOJOInsertOutputOperator { TestPOJOOutputOperator() { cleanTable(); } - public int getNumOfEventsInStore() + public int getNumOfEventsInStore(String tableName) { Connection con; try { con = DriverManager.getConnection(URL); Statement stmt = con.createStatement(); - String countQuery = "SELECT count(*) from " + TABLE_POJO_NAME; + String countQuery = "SELECT count(*) from " + tableName; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + public int getNumOfNullEventsInStore(String tableName) + { + Connection con; + try { + con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(*) from " + tableName + " where name1 is null"; ResultSet resultSet = stmt.executeQuery(countQuery); resultSet.next(); return resultSet.getInt(1); @@ -309,6 +336,7 @@ public class JdbcOperatorTest outputOperator.setup(context); + outputOperator.activate(context); List<TestEvent> events = Lists.newArrayList(); for (int i = 0; i < 10; i++) { events.add(new TestEvent(i)); @@ -368,7 +396,98 @@ public class JdbcOperatorTest } outputOperator.endWindow(); - Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore()); + Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME)); + } + + /** + * This test will assume direct mapping for POJO fields to DB columns + */ + @Test + public void testJdbcPojoInsertOutputOperator() + { + JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); + transactionalStore.setDatabaseDriver(DB_DRIVER); + transactionalStore.setDatabaseUrl(URL); + + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = + new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator(); + outputOperator.setBatchSize(3); + outputOperator.setTablename(TABLE_POJO_NAME); + + outputOperator.setStore(transactionalStore); + + outputOperator.setup(context); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + outputOperator.input.setup(tpc); + + outputOperator.activate(context); + + List<TestPOJOEvent> events = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + events.add(new TestPOJOEvent(i, "test" + i)); + } + + outputOperator.beginWindow(0); + for (TestPOJOEvent event : events) { + outputOperator.input.process(event); + } + outputOperator.endWindow(); + + Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME)); + } + + /** + * This test will assume direct mapping for POJO fields to DB columns + */ + @Test + public void testJdbcPojoInsertOutputOperatorNullName() + { + JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); + transactionalStore.setDatabaseDriver(DB_DRIVER); + transactionalStore.setDatabaseUrl(URL); + + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = + new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator(); + outputOperator.setBatchSize(3); + outputOperator.setTablename(TABLE_POJO_NAME_NAME_DIFF); + + outputOperator.setStore(transactionalStore); + + outputOperator.setup(context); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + outputOperator.input.setup(tpc); + + outputOperator.activate(context); + + List<TestPOJOEvent> events = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + events.add(new TestPOJOEvent(i, "test" + i)); + } + + outputOperator.beginWindow(0); + for (TestPOJOEvent event : events) { + outputOperator.input.process(event); + } + outputOperator.endWindow(); + + Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME_NAME_DIFF)); + Assert.assertEquals("null name rows in db", 10, outputOperator.getNumOfNullEventsInStore(TABLE_POJO_NAME_NAME_DIFF)); } @Test
