Repository: incubator-apex-malhar Updated Branches: refs/heads/jdbc-schemaIntegration [created] af203f269
MLHR-1836 #comment Schema integration with JDBC operators Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/af203f26 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/af203f26 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/af203f26 Branch: refs/heads/jdbc-schemaIntegration Commit: af203f2694ed7dc893f4671695337b7937d27749 Parents: a9bcce1 Author: Chandni Singh <[email protected]> Authored: Fri Jul 31 17:43:45 2015 -0700 Committer: Chandni Singh <[email protected]> Committed: Thu Sep 3 16:21:34 2015 -0700 ---------------------------------------------------------------------- .../lib/db/jdbc/JdbcPOJOInputOperator.java | 582 +++++++++++++++++++ .../lib/db/jdbc/JdbcPOJOOutputOperator.java | 315 ++++++---- .../com/datatorrent/lib/db/jdbc/JdbcStore.java | 21 +- .../lib/db/jdbc/JdbcOperatorTest.java | 140 +++-- .../datatorrent/lib/helper/TestPortContext.java | 59 ++ pom.xml | 3 + 6 files changed, 940 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/af203f26/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java new file mode 100644 index 0000000..d5a8e1f --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java @@ -0,0 +1,582 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.lib.db.jdbc; + +import java.math.BigDecimal; +import java.sql.*; +import java.util.List; +import java.util.Map; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; + +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.PojoUtils; + +/** + * A concrete input operator that reads from a database through the JDBC API.<br/> + * This operator by default uses the {@link #fieldInfos} and the table name to construct the sql query.<br/> + * + * A user can provide there own query by setting the {@link #query} property which takes precedence.<br/> + * + * For eg. user can set the query property to a complex one : "select x1, x2 from t1, t2 where t1.x3 = t2.x3 ;"<br/> + * + * @displayName Jdbc Input Operator + * @category Input + * @tags database, sql, pojo, jdbc + * @since 2.1.0 + */ +public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> implements Operator.ActivationListener<Context.OperatorContext> +{ + private static int DEF_BATCH_SIZE = 100; + + //table and query definition + protected String tableName; + protected String whereCondition; + protected String groupByClause; + protected String havingCondition; + protected String orderByExpr; + protected String query; + protected boolean limitSupported; + + @NotNull + protected List<FieldInfo> fieldInfos; + + private long offset; + + @Min(1) + private int batchSize; + + private final transient List<ActiveFieldInfo> columnFieldSetters; + private transient boolean windowDone; + + protected String columnsExpression; + protected List<Integer> columnDataTypes; + + private transient PreparedStatement preparedStatement; + protected transient Class<?> pojoClass; + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>() + { + @Override + public void setup(Context.PortContext context) + { + pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS); + } + }; + + public JdbcPOJOInputOperator() + { + super(); + batchSize = DEF_BATCH_SIZE; + limitSupported = true; + columnFieldSetters = Lists.newArrayList(); + } + + @Override + public void setup(Context.OperatorContext context) + { + Preconditions.checkArgument(query != null || tableName != null, "both query and table name are not set"); + super.setup(context); + + try { + //closing the query statement in super class as it is not needed + queryStatement.close(); + if (query == null && columnsExpression == null) { + StringBuilder columns = new StringBuilder(); + for (int i = 0; i < fieldInfos.size(); i++) { + columns.append(fieldInfos.get(i).getColumnName()); + if (i < fieldInfos.size() - 1) { + columns.append(","); + } + } + columnsExpression = columns.toString(); + LOG.debug("select expr {}", columnsExpression); + } + + preparedStatement = store.connection.prepareStatement(queryToRetrieveData()); + if (columnDataTypes == null) { + populateColumnDataTypes(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + + for (FieldInfo fi : fieldInfos) { + columnFieldSetters.add(new ActiveFieldInfo(fi)); + } + } + + protected void populateColumnDataTypes() throws SQLException + { + columnDataTypes = Lists.newArrayList(); + preparedStatement.setLong(1, 0); + preparedStatement.setLong(2, 1); + try (ResultSet rs = preparedStatement.executeQuery()) { + Map<String, Integer> nameToType = Maps.newHashMap(); + ResultSetMetaData rsMetaData = rs.getMetaData(); + LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount()); + + for (int i = 1; i <= rsMetaData.getColumnCount(); i++) { + int type = rsMetaData.getColumnType(i); + String name = rsMetaData.getColumnName(i); + LOG.debug("column name {} type {}", name, type); + if (query == null) { + columnDataTypes.add(type); + } else { + //when it is a custom query we need to ensure the types are in the same order as field infos + nameToType.put(name, type); + } + } + + if (query != null) { + for (FieldInfo fieldInfo : fieldInfos) { + columnDataTypes.add(nameToType.get(fieldInfo.getColumnName())); + } + } + } + } + + @Override + public void beginWindow(long l) + { + windowDone = false; + } + + @Override + public void emitTuples() + { + if (!windowDone) { + try { + preparedStatement.setLong(1, offset); + preparedStatement.setLong(2, batchSize); + + ResultSet resultSet = preparedStatement.executeQuery(); + if (resultSet.next()) { + do { + Object tuple = getTuple(resultSet); + outputPort.emit(tuple); + offset++; + } + while (resultSet.next()); + } else { + windowDone = true; + } + resultSet.close(); + } catch (SQLException ex) { + store.disconnect(); + throw new RuntimeException(ex); + } + } + } + + @SuppressWarnings("unchecked") + @Override + public Object getTuple(ResultSet result) + { + Object obj; + try { + obj = pojoClass.newInstance(); + } catch (InstantiationException | IllegalAccessException ex) { + store.disconnect(); + throw new RuntimeException(ex); + } + + try { + for (int i = 0; i < fieldInfos.size(); i++) { + int type = columnDataTypes.get(i); + ActiveFieldInfo afi = columnFieldSetters.get(i); + + switch (type) { + case Types.CHAR: + case Types.VARCHAR: + String strVal = result.getString(i + 1); + ((PojoUtils.Setter<Object, String>) afi.setterOrGetter).set(obj, strVal); + break; + + case Types.BOOLEAN: + boolean boolVal = result.getBoolean(i + 1); + ((PojoUtils.SetterBoolean<Object>) afi.setterOrGetter).set(obj, boolVal); + break; + + case Types.TINYINT: + byte byteVal = result.getByte(i + 1); + ((PojoUtils.SetterByte<Object>) afi.setterOrGetter).set(obj, byteVal); + break; + + case Types.SMALLINT: + short shortVal = result.getShort(i + 1); + ((PojoUtils.SetterShort<Object>) afi.setterOrGetter).set(obj, shortVal); + break; + + case Types.INTEGER: + int intVal = result.getInt(i + 1); + ((PojoUtils.SetterInt<Object>) afi.setterOrGetter).set(obj, intVal); + break; + + case Types.BIGINT: + long longVal = result.getLong(i + 1); + ((PojoUtils.SetterLong<Object>) afi.setterOrGetter).set(obj, longVal); + break; + + case Types.FLOAT: + float floatVal = result.getFloat(i + 1); + ((PojoUtils.SetterFloat<Object>) afi.setterOrGetter).set(obj, floatVal); + break; + + case Types.DOUBLE: + double doubleVal = result.getDouble(i + 1); + ((PojoUtils.SetterDouble<Object>) afi.setterOrGetter).set(obj, doubleVal); + break; + + case Types.DECIMAL: + BigDecimal bdVal = result.getBigDecimal(i + 1); + ((PojoUtils.Setter<Object, BigDecimal>) afi.setterOrGetter).set(obj, bdVal); + break; + + case Types.TIMESTAMP: + Timestamp tsVal = result.getTimestamp(i + 1); + ((PojoUtils.SetterLong<Object>) afi.setterOrGetter).set(obj, tsVal.getTime()); + break; + + case Types.TIME: + Time timeVal = result.getTime(i + 1); + ((PojoUtils.SetterLong<Object>) afi.setterOrGetter).set(obj, timeVal.getTime()); + break; + + case Types.DATE: + Date dateVal = result.getDate(i + 1); + ((PojoUtils.SetterLong<Object>) afi.setterOrGetter).set(obj, dateVal.getTime()); + break; + + default: + handleUnknownDataType(type, obj, afi); + break; + } + } + return obj; + } catch (SQLException e) { + store.disconnect(); + throw new RuntimeException("fetching metadata", e); + } + } + + @SuppressWarnings("UnusedParameters") + protected void handleUnknownDataType(int type, Object tuple, ActiveFieldInfo activeFieldInfo) + { + throw new RuntimeException("unsupported data type " + type); + } + + @Override + public String queryToRetrieveData() + { + StringBuilder builder = new StringBuilder(); + + if (query != null) { + builder.append(query.trim()); + if (builder.charAt(builder.length() - 1) == ';') { + builder.deleteCharAt(builder.length() - 1); + } + } else { + builder.append("SELECT ").append(columnsExpression).append(" FROM ").append(tableName); + if (whereCondition != null) { + builder.append(" WHERE ").append(whereCondition); + } + if (groupByClause != null) { + builder.append(" GROUP BY ").append(groupByClause); + if (havingCondition != null) { + builder.append(" HAVING ").append(havingCondition); + } + } + if (orderByExpr != null) { + builder.append(" ORDER BY ").append(orderByExpr); + } + } + if (limitSupported) { + builder.append(" LIMIT ? , ? "); + } else { + //Starting from Oracle 12c R1 there is a row limiting clause. + builder.append(" OFFSET ? ROWS FETCH NEXT ? ROWS ONLY "); + } + builder.append(";"); + String queryStr = builder.toString(); + LOG.debug("built query {}", queryStr); + return queryStr; + } + + /** + * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name. + */ + public List<FieldInfo> getFieldInfos() + { + return fieldInfos; + } + + /** + * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/> + * The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression. + * + * @description $[].columnName name of the database column name + * @description $[].pojoFieldExpression pojo field name or expression + * @useSchema $[].pojoFieldExpression outputPort.fields[].name + */ + public void setFieldInfos(List<FieldInfo> fieldInfos) + { + this.fieldInfos = fieldInfos; + } + + /** + * @return table name + */ + public String getTableName() + { + return tableName; + } + + /** + * Sets the table name. + * + * @param tableName table name + */ + public void setTableName(String tableName) + { + this.tableName = tableName; + } + + /** + * @return where condition + */ + public String getWhereCondition() + { + return whereCondition; + } + + /** + * Sets the where condition. + * + * @param whereCondition where condition. + */ + public void setWhereCondition(String whereCondition) + { + this.whereCondition = whereCondition; + } + + /** + * @return group-by clause + */ + public String getGroupByClause() + { + return groupByClause; + } + + /** + * Sets the group by clause. + * + * @param groupByClause group-by clause. + */ + public void setGroupByClause(String groupByClause) + { + this.groupByClause = groupByClause; + } + + /** + * @return having condition + */ + public String getHavingCondition() + { + return havingCondition; + } + + /** + * Sets the having condition. + * + * @param havingCondition having condition + */ + public void setHavingCondition(String havingCondition) + { + this.havingCondition = havingCondition; + } + + /** + * @return order by expression. + */ + public String getOrderByExpr() + { + return orderByExpr; + } + + /** + * Sets the order by expression. + * + * @param orderByExpr order by expression. + */ + public void setOrderByExpr(String orderByExpr) + { + this.orderByExpr = orderByExpr; + } + + /** + * @return query + */ + public String getQuery() + { + return query; + } + + /** + * Sets the query + * + * @param query query + */ + public void setQuery(String query) + { + this.query = query; + } + + /** + * @return true if database support limit clause; false otherwise + */ + public boolean isLimitSupported() + { + return limitSupported; + } + + /** + * Sets whether database supports limit clause. If it doesn't then - offset {lastRow} rows fetch next {numRows} rows only + * syntax is used. + * + * @param limitSupported true if limit is supported by database; false otherwise + */ + public void setLimitSupported(boolean limitSupported) + { + this.limitSupported = limitSupported; + } + + @Override + public void activate(Context.OperatorContext context) + { + for (int i = 0; i < columnDataTypes.size(); i++) { + final int type = columnDataTypes.get(i); + JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldSetters.get(i); + switch (type) { + case (Types.CHAR): + case (Types.VARCHAR): + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(), + String.class); + break; + + case (Types.BOOLEAN): + activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case (Types.TINYINT): + activeFieldInfo.setterOrGetter = PojoUtils.createSetterByte(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case (Types.SMALLINT): + activeFieldInfo.setterOrGetter = PojoUtils.createSetterShort(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case (Types.INTEGER): + activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case (Types.BIGINT): + activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case (Types.FLOAT): + activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case (Types.DOUBLE): + activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case Types.DECIMAL: + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(), + BigDecimal.class); + break; + + case Types.TIMESTAMP: + activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case Types.TIME: + activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case Types.DATE: + activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + default: + handleUnknownDataType(type, null, activeFieldInfo); + break; + } + } + } + + @Override + public void deactivate() + { + + } + + protected static class ActiveFieldInfo + { + final FieldInfo fieldInfo; + Object setterOrGetter; + + ActiveFieldInfo(FieldInfo fieldInfo) + { + this.fieldInfo = fieldInfo; + } + } + + /** + * @return batch size which is the number of rows retrieved from the database in a window. + */ + public int getBatchSize() + { + return batchSize; + } + + /** + * Sets the batch size which is the number of rows retrieved from the database in a window. + * + * @param batchSize number of rows retrieved from db in a window. + */ + public void setBatchSize(int batchSize) + { + this.batchSize = batchSize; + } + + public static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOInputOperator.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/af203f26/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java index 7451a50..35fcf0f 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java @@ -15,19 +15,25 @@ */ package com.datatorrent.lib.db.jdbc; +import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; + +import com.datatorrent.lib.util.FieldInfo; import com.datatorrent.lib.util.PojoUtils; import com.datatorrent.lib.util.PojoUtils.Getter; import com.datatorrent.lib.util.PojoUtils.GetterBoolean; -import com.datatorrent.lib.util.PojoUtils.GetterChar; import com.datatorrent.lib.util.PojoUtils.GetterDouble; import com.datatorrent.lib.util.PojoUtils.GetterFloat; import com.datatorrent.lib.util.PojoUtils.GetterInt; import com.datatorrent.lib.util.PojoUtils.GetterLong; import com.datatorrent.lib.util.PojoUtils.GetterShort; +import java.math.BigDecimal; import java.sql.*; -import java.util.ArrayList; +import java.util.List; import javax.validation.constraints.NotNull; @@ -35,6 +41,8 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + /** * <p> * JdbcPOJOOutputOperator class.</p> @@ -46,71 +54,48 @@ import org.slf4j.LoggerFactory; * @since 2.1.0 */ @Evolving -public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object> +public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object> implements Operator.ActivationListener<OperatorContext> { @NotNull - private ArrayList<String> dataColumns; - //These are extracted from table metadata - private ArrayList<Integer> columnDataTypes; - - /* - * An arraylist of data column names to be set in database. - * Gets column names. - */ - public ArrayList<String> getDataColumns() - { - return dataColumns; - } + private List<FieldInfo> fieldInfos; - public void setDataColumns(ArrayList<String> dataColumns) - { - this.dataColumns = dataColumns; - } + private List<Integer> columnDataTypes; @NotNull private String tablename; - /* - * Gets the Tablename in database. - */ - public String getTablename() - { - return tablename; - } + private final transient List<JdbcPOJOInputOperator.ActiveFieldInfo> columnFieldGetters; - public void setTablename(String tablename) - { - this.tablename = tablename; - } + private String insertStatement; - /* - * An ArrayList of Java expressions that will yield the field value from the POJO. - * Each expression corresponds to one column in the database table. - */ - public ArrayList<String> getExpressions() - { - return expressions; - } + private transient Class<?> pojoClass; - public void setExpressions(ArrayList<String> expressions) + @InputPortFieldAnnotation(optional = true, schemaRequired = true) + public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() { - this.expressions = expressions; - } + @Override + public void setup(Context.PortContext context) + { + pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS); + } - @NotNull - private ArrayList<String> expressions; - private transient ArrayList<Object> getters; - private String insertStatement; + @Override + public void process(Object t) + { + JdbcPOJOOutputOperator.super.input.process(t); + } + + }; @Override public void setup(OperatorContext context) { - StringBuilder columns = new StringBuilder(""); - StringBuilder values = new StringBuilder(""); - for (int i = 0; i < dataColumns.size(); i++) { - columns.append(dataColumns.get(i)); + StringBuilder columns = new StringBuilder(); + StringBuilder values = new StringBuilder(); + for (int i = 0; i < fieldInfos.size(); i++) { + columns.append(fieldInfos.get(i).getColumnName()); values.append("?"); - if (i < dataColumns.size() - 1) { + if (i < fieldInfos.size() - 1) { columns.append(","); values.append(","); } @@ -120,155 +105,227 @@ public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOpe + " (" + columns.toString() + ")" + " VALUES (" + values.toString() + ")"; LOG.debug("insert statement is {}", insertStatement); + super.setup(context); - Connection conn = store.getConnection(); - LOG.debug("Got Connection."); - try { - Statement st = conn.createStatement(); - ResultSet rs = st.executeQuery("select * from " + tablename); - ResultSetMetaData rsMetaData = rs.getMetaData(); + if (columnDataTypes == null) { + try { + populateColumnDataTypes(columns.toString()); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } - int numberOfColumns = 0; + for (FieldInfo fi : fieldInfos) { + columnFieldGetters.add(new JdbcPOJOInputOperator.ActiveFieldInfo(fi)); + } + } - numberOfColumns = rsMetaData.getColumnCount(); + protected void populateColumnDataTypes(String columns) throws SQLException + { + columnDataTypes = Lists.newArrayList(); + try (Statement st = store.getConnection().createStatement()) { + ResultSet rs = st.executeQuery("select " + columns + " from " + tablename); - LOG.debug("resultSet MetaData column Count=" + numberOfColumns); + ResultSetMetaData rsMetaData = rs.getMetaData(); + LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount()); - for (int i = 1; i <= numberOfColumns; i++) { - // get the designated column's SQL type. + for (int i = 1; i <= rsMetaData.getColumnCount(); i++) { int type = rsMetaData.getColumnType(i); - LOG.debug("column name {}", rsMetaData.getColumnTypeName(i)); columnDataTypes.add(type); - LOG.debug("sql column type is " + type); + LOG.debug("column name {} type {}", rsMetaData.getColumnName(i), type); } } - catch (SQLException ex) { - throw new RuntimeException(ex); - } - } public JdbcPOJOOutputOperator() { super(); - columnDataTypes = new ArrayList<Integer>(); - getters = new ArrayList<Object>(); + columnFieldGetters = Lists.newArrayList(); } @Override - public void processTuple(Object tuple) + protected String getUpdateCommand() { - if (getters.isEmpty()) { - processFirstTuple(tuple); - } - super.processTuple(tuple); + LOG.debug("insert statement is {}", insertStatement); + return insertStatement; } - public void processFirstTuple(Object tuple) + @Override + @SuppressWarnings("unchecked") + protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException { - final Class<?> fqcn = tuple.getClass(); final int size = columnDataTypes.size(); for (int i = 0; i < size; i++) { final int type = columnDataTypes.get(i); - final String getterExpression = expressions.get(i); - final Object getter; + JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldGetters.get(i); switch (type) { - case Types.CHAR: - getter = PojoUtils.createGetterChar(fqcn, getterExpression); + case (Types.CHAR): + case (Types.VARCHAR): + statement.setString(i + 1, ((Getter<Object, String>) activeFieldInfo.setterOrGetter).get(tuple)); break; - case Types.VARCHAR: - getter = PojoUtils.createGetter(fqcn, getterExpression, String.class); + + case (Types.BOOLEAN): + statement.setBoolean(i + 1, ((GetterBoolean<Object>) activeFieldInfo.setterOrGetter).get(tuple)); break; - case Types.BOOLEAN: - case Types.TINYINT: - getter = PojoUtils.createGetterBoolean(fqcn, getterExpression); + + case (Types.TINYINT): + statement.setByte(i + 1, ((PojoUtils.GetterByte<Object>) activeFieldInfo.setterOrGetter).get(tuple)); break; - case Types.SMALLINT: - getter = PojoUtils.createGetterShort(fqcn, getterExpression); + + case (Types.SMALLINT): + statement.setShort(i + 1, ((GetterShort<Object>) activeFieldInfo.setterOrGetter).get(tuple)); break; - case Types.INTEGER: - getter = PojoUtils.createGetterInt(fqcn, getterExpression); + + case (Types.INTEGER): + statement.setInt(i + 1, ((GetterInt<Object>) activeFieldInfo.setterOrGetter).get(tuple)); break; - case Types.BIGINT: - getter = PojoUtils.createGetterLong(fqcn, getterExpression); + + case (Types.BIGINT): + statement.setLong(i + 1, ((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)); break; - case Types.FLOAT: - getter = PojoUtils.createGetterFloat(fqcn, getterExpression); + + case (Types.FLOAT): + statement.setFloat(i + 1, ((GetterFloat<Object>) activeFieldInfo.setterOrGetter).get(tuple)); break; - case Types.DOUBLE: - getter = PojoUtils.createGetterDouble(fqcn, getterExpression); + + case (Types.DOUBLE): + statement.setDouble(i + 1, ((GetterDouble<Object>) activeFieldInfo.setterOrGetter).get(tuple)); + break; + + case Types.DECIMAL: + statement.setBigDecimal(i + 1, ((Getter<Object, BigDecimal>) activeFieldInfo.setterOrGetter).get(tuple)); + break; + + case Types.TIMESTAMP: + statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple))); + break; + + case Types.TIME: + statement.setTime(i + 1, new Time(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple))); break; + + case Types.DATE: + statement.setDate(i + 1, new Date(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple))); + break; + default: - /* - Types.DECIMAL - Types.DATE - Types.TIME - Types.ARRAY - Types.OTHER - */ - getter = PojoUtils.createGetter(fqcn, getterExpression, Object.class); + handleUnknownDataType(type, tuple, activeFieldInfo); break; } - getters.add(getter); } + } + @SuppressWarnings("UnusedParameters") + protected void handleUnknownDataType(int type, Object tuple, JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo) + { + throw new RuntimeException("unsupported data type " + type); } - @Override - protected String getUpdateCommand() + /** + * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name. + */ + public List<FieldInfo> getFieldInfos() { - LOG.debug("insertstatement is {}", insertStatement); - return insertStatement; + return fieldInfos; + } + + /** + * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/> + * The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression. + * + * @description $[].columnName name of the database column name + * @description $[].pojoFieldExpression pojo field name or expression + * @useSchema $[].pojoFieldExpression input.fields[].name + */ + public void setFieldInfos(List<FieldInfo> fieldInfos) + { + this.fieldInfos = fieldInfos; + } + + /* + * Gets the name of the table in database. + */ + public String getTablename() + { + return tablename; + } + + public void setTablename(String tablename) + { + this.tablename = tablename; } + private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOOutputOperator.class); + @Override - @SuppressWarnings("unchecked") - protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException + public void activate(OperatorContext context) { final int size = columnDataTypes.size(); for (int i = 0; i < size; i++) { final int type = columnDataTypes.get(i); + JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldGetters.get(i); switch (type) { case (Types.CHAR): - statement.setString(i + 1, ((Getter<Object, String>)getters.get(i)).get(tuple)); - break; case (Types.VARCHAR): - statement.setString(i + 1, ((Getter<Object, String>)getters.get(i)).get(tuple)); + activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(), + String.class); break; + case (Types.BOOLEAN): + activeFieldInfo.setterOrGetter = PojoUtils.createGetterBoolean(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + case (Types.TINYINT): - statement.setBoolean(i + 1, ((GetterBoolean<Object>)getters.get(i)).get(tuple)); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterByte(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; + case (Types.SMALLINT): - statement.setShort(i + 1, ((GetterShort<Object>)getters.get(i)).get(tuple)); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterShort(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; + case (Types.INTEGER): - statement.setInt(i + 1, ((GetterInt<Object>)getters.get(i)).get(tuple)); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterInt(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; + case (Types.BIGINT): - statement.setLong(i + 1, ((GetterLong<Object>)getters.get(i)).get(tuple)); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; + case (Types.FLOAT): - statement.setFloat(i + 1, ((GetterFloat<Object>)getters.get(i)).get(tuple)); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterFloat(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; + case (Types.DOUBLE): - statement.setDouble(i + 1, ((GetterDouble<Object>)getters.get(i)).get(tuple)); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case Types.DECIMAL: + activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(), + BigDecimal.class); + break; + + case Types.TIMESTAMP: + activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case Types.TIME: + activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; + + case Types.DATE: + activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + default: - /* - Types.DECIMAL - Types.DATE - Types.TIME - Types.ARRAY - Types.OTHER - */ - statement.setObject(i + 1, ((Getter<Object, Object>)getters.get(i)).get(tuple)); + handleUnknownDataType(type, null, activeFieldInfo); break; } } } - private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOOutputOperator.class); - + @Override + public void deactivate() + { + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/af203f26/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java index 82e613a..4c4c004 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java @@ -44,7 +44,7 @@ public class JdbcStore implements Connectable private String databaseUrl; @NotNull private String databaseDriver; - private final Properties connectionProperties; + private Properties connectionProperties; protected transient Connection connection = null; /* @@ -115,10 +115,10 @@ public class JdbcStore implements Connectable } /** - * Connection Properties for JDBC Connection. - * Sets the properties on the jdbc connection. + * Sets the connection properties on JDBC connection. Connection properties are provided as a string. + * * @param connectionProps Comma separated list of properties. Property key and value are separated by colon. - * eg. user:xyz,password:ijk + * eg. user:xyz,password:ijk */ public void setConnectionProperties(String connectionProps) { @@ -131,8 +131,17 @@ public class JdbcStore implements Connectable } /** - * Connection Properties for JDBC Connection. - * Gets the properties on the jdbc connection. + * Sets the connection properties on JDBC connection. + * + * @param connectionProperties connection properties. + */ + public void setConnectionProperties(Properties connectionProperties) + { + this.connectionProperties = connectionProperties; + } + + /** + * Get the connection properties of JDBC connection. */ public Properties getConnectionProperties() { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/af203f26/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java index 4cca31f..c77fb73 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java @@ -26,12 +26,15 @@ import org.junit.Test; import com.google.common.collect.Lists; +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; import com.datatorrent.api.DAG; -import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.helper.TestPortContext; import com.datatorrent.lib.testbench.CollectorTestSink; -import java.util.ArrayList; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.netlet.util.DTThrowable; /** * Tests for {@link AbstractJdbcTransactionableOutputOperator} and {@link AbstractJdbcInputOperator} @@ -103,19 +106,18 @@ public class JdbcOperatorTest Statement stmt = con.createStatement(); String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " - + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " - + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " - + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " - + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " - + ")"; + + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " + + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + + ")"; stmt.executeUpdate(createMetaTable); String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID INTEGER)"; stmt.executeUpdate(createTable); String createPOJOTable = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME + "(id INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id ))"; stmt.executeUpdate(createPOJOTable); - } - catch (Throwable e) { + } catch (Throwable e) { DTThrowable.rethrow(e); } } @@ -131,8 +133,23 @@ public class JdbcOperatorTest cleanTable = "delete from " + JdbcTransactionalStore.DEFAULT_META_TABLE; stmt.executeUpdate(cleanTable); + } catch (SQLException e) { + throw new RuntimeException(e); } - catch (SQLException e) { + } + + public static void insertEventsInTable(int numEvents) + { + try { + Connection con = DriverManager.getConnection(URL); + String insert = "insert into " + TABLE_NAME + " values (?)"; + PreparedStatement stmt = con.prepareStatement(insert); + + for (int i = 0; i < numEvents; i++) { + stmt.setInt(1, i); + stmt.executeUpdate(); + } + } catch (SQLException e) { throw new RuntimeException(e); } } @@ -170,8 +187,7 @@ public class JdbcOperatorTest ResultSet resultSet = stmt.executeQuery(countQuery); resultSet.next(); return resultSet.getInt(1); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException("fetching count", e); } } @@ -195,8 +211,7 @@ public class JdbcOperatorTest ResultSet resultSet = stmt.executeQuery(countQuery); resultSet.next(); return resultSet.getInt(1); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException("fetching count", e); } } @@ -218,8 +233,7 @@ public class JdbcOperatorTest { try { return new TestEvent(result.getInt(1)); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException(e); } } @@ -229,23 +243,6 @@ public class JdbcOperatorTest { return retrieveQuery; } - - public void insertEventsInTable(int numEvents) - { - try { - Connection con = DriverManager.getConnection(URL); - String insert = "insert into " + TABLE_NAME + " values (?)"; - PreparedStatement stmt = con.prepareStatement(insert); - - for (int i = 0; i < numEvents; i++) { - stmt.setInt(1, i); - stmt.executeUpdate(); - } - } - catch (SQLException e) { - throw new RuntimeException(e); - } - } } @Test @@ -281,7 +278,7 @@ public class JdbcOperatorTest } @Test - public void testJdbcPOJOOutputOperator() + public void testJdbcPojoOutputOperator() { JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore(); transactionalStore.setDatabaseDriver(DB_DRIVER); @@ -294,25 +291,30 @@ public class JdbcOperatorTest TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator(); outputOperator.setBatchSize(3); outputOperator.setTablename(TABLE_POJO_NAME); - ArrayList<String> dataColumns = new ArrayList<String>(); - dataColumns.add("id"); - dataColumns.add("name"); - outputOperator.setDataColumns(dataColumns); + + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("ID", "id", null)); + fieldInfos.add(new FieldInfo("NAME", "name", null)); + outputOperator.setFieldInfos(fieldInfos); + outputOperator.setStore(transactionalStore); - ArrayList<String> expressions = new ArrayList<String>(); - expressions.add("getId()"); - expressions.add("getName()"); - outputOperator.setExpressions(expressions); outputOperator.setup(context); + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + outputOperator.input.setup(tpc); + + outputOperator.activate(context); + List<TestPOJOEvent> events = Lists.newArrayList(); for (int i = 0; i < 10; i++) { events.add(new TestPOJOEvent(i, "test" + i)); } outputOperator.beginWindow(0); - for (TestPOJOEvent event: events) { + for (TestPOJOEvent event : events) { outputOperator.input.process(event); } outputOperator.endWindow(); @@ -321,7 +323,7 @@ public class JdbcOperatorTest } @Test - public void TestJdbcInputOperator() + public void testJdbcInputOperator() { JdbcStore store = new JdbcStore(); store.setDatabaseDriver(DB_DRIVER); @@ -333,7 +335,7 @@ public class JdbcOperatorTest TestInputOperator inputOperator = new TestInputOperator(); inputOperator.setStore(store); - inputOperator.insertEventsInTable(10); + insertEventsInTable(10); CollectorTestSink<Object> sink = new CollectorTestSink<Object>(); inputOperator.outputPort.setSink(sink); @@ -345,5 +347,53 @@ public class JdbcOperatorTest Assert.assertEquals("rows from db", 10, sink.collectedTuples.size()); } + + @Test + public void testJdbcPojoInputOperator() + { + JdbcStore store = new JdbcStore(); + store.setDatabaseDriver(DB_DRIVER); + store.setDatabaseUrl(URL); + + Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap); + + insertEventsInTable(10); + + JdbcPOJOInputOperator inputOperator = new JdbcPOJOInputOperator(); + inputOperator.setStore(store); + inputOperator.setTableName(TABLE_NAME); + + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("ID", "id", null)); + inputOperator.setFieldInfos(fieldInfos); + + inputOperator.setBatchSize(5); + + CollectorTestSink<Object> sink = new CollectorTestSink<>(); + inputOperator.outputPort.setSink(sink); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class); + TestPortContext tpc = new TestPortContext(portAttributes); + + inputOperator.setup(context); + inputOperator.outputPort.setup(tpc); + + inputOperator.activate(context); + + inputOperator.beginWindow(0); + inputOperator.emitTuples(); + inputOperator.endWindow(); + + Assert.assertEquals("rows from db", 5, sink.collectedTuples.size()); + + inputOperator.beginWindow(1); + inputOperator.emitTuples(); + inputOperator.endWindow(); + + Assert.assertEquals("rows from db", 10, sink.collectedTuples.size()); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/af203f26/library/src/test/java/com/datatorrent/lib/helper/TestPortContext.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/helper/TestPortContext.java b/library/src/test/java/com/datatorrent/lib/helper/TestPortContext.java new file mode 100644 index 0000000..8bd72a6 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/helper/TestPortContext.java @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.lib.helper; + +import java.util.Collection; + +import javax.validation.constraints.NotNull; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; + +public class TestPortContext implements Context.PortContext +{ + public final Attribute.AttributeMap attributeMap; + + public TestPortContext(@NotNull Attribute.AttributeMap attributeMap) + { + this.attributeMap = Preconditions.checkNotNull(attributeMap, "attributes"); + } + + @Override + public Attribute.AttributeMap getAttributes() + { + return attributeMap; + } + + @Override + public <T> T getValue(Attribute<T> key) + { + return attributeMap.get(key); + } + + @Override + public void setCounters(Object counters) + { + + } + + @Override + public void sendMetrics(Collection<String> metricNames) + { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/af203f26/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1468163..c9f5397 100644 --- a/pom.xml +++ b/pom.xml @@ -80,6 +80,9 @@ <onlyBinaryIncompatible>false</onlyBinaryIncompatible> <includeSynthetic>false</includeSynthetic> <ignoreMissingClasses>true</ignoreMissingClasses> + <excludes> + <exclude>*POJO*</exclude> + </excludes> </parameter> <skip>${semver.plugin.skip}</skip> </configuration>
