Repository: apex-malhar Updated Branches: refs/heads/master 87d8aef7c -> c3d3a880d
APEXMALHAR-2290 fix to optimize the function which was populating meta data for columns Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c3d3a880 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c3d3a880 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c3d3a880 Branch: refs/heads/master Commit: c3d3a880d17181b628d28874545f5ba34effefd0 Parents: 87d8aef Author: Hitesh-Scorpio <[email protected]> Authored: Wed Oct 12 20:30:22 2016 +0530 Committer: Hitesh-Scorpio <[email protected]> Committed: Wed Oct 26 12:09:02 2016 +0530 ---------------------------------------------------------------------- .../db/jdbc/JdbcPOJOInsertOutputOperator.java | 71 +++++++++++++------- 1 file changed, 45 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3d3a880/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java index f5e6081..706757a 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java @@ -19,11 +19,11 @@ package com.datatorrent.lib.db.jdbc; import java.lang.reflect.Field; +import java.sql.DatabaseMetaData; import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.sql.Statement; import java.sql.Types; +import java.util.HashSet; import java.util.List; import org.slf4j.Logger; @@ -60,23 +60,31 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator // Populate columnNames and columnDataTypes try { + columnNames = Lists.newArrayList(); + columnDataTypes = Lists.newArrayList(); + columnNullabilities = Lists.newArrayList(); + /** + * columnNamesSet is the set having column names given by the user + */ + HashSet<String> columnNamesSet = new HashSet<>(); if (getFieldInfos() == null) { // then assume direct mapping LOG.info("FieldInfo missing. Assuming direct mapping between POJO fields and DB columns"); - populateColumnDataTypes(null); } else { // FieldInfo supplied by user StringBuilder columns = new StringBuilder(); StringBuilder values = new StringBuilder(); for (int i = 0; i < getFieldInfos().size(); i++) { - columns.append(getFieldInfos().get(i).getColumnName()); + String columnName = getFieldInfos().get(i).getColumnName(); + columns.append(columnName); values.append("?"); if (i < getFieldInfos().size() - 1) { columns.append(","); values.append(","); } + columnNamesSet.add(columnName.toUpperCase()); } - populateColumnDataTypes(columns.toString()); } + populateColumnDataTypes(columnNamesSet); } catch (SQLException e) { throw new RuntimeException(e); } @@ -93,7 +101,7 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator String columnName = columnNames.get(i); String pojoField = getMatchingField(fields, columnName); - if (columnNullabilities.get(i) == ResultSetMetaData.columnNoNulls && + if (columnNullabilities.get(i) == DatabaseMetaData.columnNoNulls && (pojoField == null || pojoField.length() == 0)) { throw new RuntimeException("Data for a non-nullable field: " + columnName + " not found in POJO"); } else { @@ -146,28 +154,39 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator return null; } - protected void populateColumnDataTypes(String columns) throws SQLException + /** + * Function to populate Meta Data. + * @param columnNamesSet is a set having column names given by the user + * @throws SQLException + */ + protected void populateColumnDataTypes(HashSet<String> columnNamesSet) throws SQLException { - columnNames = Lists.newArrayList(); - columnDataTypes = Lists.newArrayList(); - columnNullabilities = Lists.newArrayList(); - - try (Statement st = store.getConnection().createStatement()) { - if (columns == null || columns.length() == 0) { - columns = "*"; + ResultSet rsColumns; + DatabaseMetaData meta = store.getConnection().getMetaData(); + /**Identifiers (table names, column names etc.) may be stored internally in either uppercase or lowercase.**/ + rsColumns = meta.getColumns(null, null, getTablename().toUpperCase(), null); + if (!rsColumns.isBeforeFirst()) { + rsColumns = meta.getColumns(null, null, getTablename().toLowerCase(), null); + if (!rsColumns.isBeforeFirst()) { + /** If the table name is in quotes then some Databases store it without doing any uppercase or lowercase conversions */ + rsColumns = meta.getColumns(null, null, getTablename(), null); + if (!rsColumns.isBeforeFirst()) { + throw new RuntimeException("Table name not found"); + } } - ResultSet rs = st.executeQuery("select " + columns + " from " + getTablename()); - - ResultSetMetaData rsMetaData = rs.getMetaData(); - LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount()); - - for (int i = 1; i <= rsMetaData.getColumnCount(); i++) { - int type = rsMetaData.getColumnType(i); - String columnName = rsMetaData.getColumnName(i); - columnNames.add(columnName); - columnDataTypes.add(type); - columnNullabilities.add(rsMetaData.isNullable(i)); - LOG.debug("column name {} type {}", rsMetaData.getColumnName(i), type); + } + boolean readAllColumns = columnNamesSet.size() == 0 ? true : false; + int remainingColumns = columnNamesSet.size(); + while (rsColumns.next()) { + if (readAllColumns || remainingColumns > 0) { + if (readAllColumns || columnNamesSet.contains(rsColumns.getString("COLUMN_NAME").toUpperCase())) { + columnNames.add(rsColumns.getString("COLUMN_NAME")); + columnNullabilities.add(rsColumns.getInt("NULLABLE")); + columnDataTypes.add(rsColumns.getInt("DATA_TYPE")); + remainingColumns--; + } + } else { + break; } } }
