[FLINK-3472] [jdbc] Give a better exception if jdbc column has a null value
This closes #1772 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e719eddb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e719eddb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e719eddb Branch: refs/heads/master Commit: e719eddb9606351baccb45491f473563c2fe61fa Parents: 2e78a36 Author: zentol <[email protected]> Authored: Wed Mar 9 11:11:31 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Sun Mar 13 16:03:04 2016 +0100 ---------------------------------------------------------------------- .../flink/api/java/io/jdbc/JDBCInputFormat.java | 168 ++++++++++--------- 1 file changed, 86 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e719eddb/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java index 132edc4..b764350 100644 --- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java +++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java @@ -191,89 +191,93 @@ public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, Inp * * @param reuse Target Record. */ - private void addValue(OUT reuse) throws SQLException { + private void addValue(OUT reuse) throws IOException, SQLException { for (int pos = 0; pos < columnTypes.length; pos++) { - switch (columnTypes[pos]) { - case java.sql.Types.NULL: - reuse.setField(NullValue.getInstance(), pos); - break; - case java.sql.Types.BOOLEAN: - reuse.setField(resultSet.getBoolean(pos + 1), pos); - break; - case java.sql.Types.BIT: - reuse.setField(resultSet.getBoolean(pos + 1), pos); - break; - case java.sql.Types.CHAR: - reuse.setField(resultSet.getString(pos + 1), pos); - break; - case java.sql.Types.NCHAR: - reuse.setField(resultSet.getString(pos + 1), pos); - break; - case java.sql.Types.VARCHAR: - reuse.setField(resultSet.getString(pos + 1), pos); - break; - case java.sql.Types.LONGVARCHAR: - reuse.setField(resultSet.getString(pos + 1), pos); - break; - case java.sql.Types.LONGNVARCHAR: - reuse.setField(resultSet.getString(pos + 1), pos); - break; - case java.sql.Types.TINYINT: - reuse.setField(resultSet.getShort(pos + 1), pos); - break; - case java.sql.Types.SMALLINT: - reuse.setField(resultSet.getShort(pos + 1), pos); - break; - case java.sql.Types.BIGINT: - reuse.setField(resultSet.getLong(pos + 1), pos); - break; - case java.sql.Types.INTEGER: - reuse.setField(resultSet.getInt(pos + 1), pos); - break; - case java.sql.Types.FLOAT: - reuse.setField(resultSet.getDouble(pos + 1), pos); - break; - case java.sql.Types.REAL: - reuse.setField(resultSet.getFloat(pos + 1), pos); - break; - case java.sql.Types.DOUBLE: - reuse.setField(resultSet.getDouble(pos + 1), pos); - break; - case java.sql.Types.DECIMAL: - reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos); - break; - case java.sql.Types.NUMERIC: - reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos); - break; - case java.sql.Types.DATE: - reuse.setField(resultSet.getDate(pos + 1).toString(), pos); - break; - case java.sql.Types.TIME: - reuse.setField(resultSet.getTime(pos + 1).getTime(), pos); - break; - case java.sql.Types.TIMESTAMP: - reuse.setField(resultSet.getTimestamp(pos + 1).toString(), pos); - break; - case java.sql.Types.SQLXML: - reuse.setField(resultSet.getSQLXML(pos + 1).toString(), pos); - break; - default: - throw new SQLException("Unsupported sql-type [" + columnTypes[pos] + "] on column [" + pos + "]"); - - // case java.sql.Types.BINARY: - // case java.sql.Types.VARBINARY: - // case java.sql.Types.LONGVARBINARY: - // case java.sql.Types.ARRAY: - // case java.sql.Types.JAVA_OBJECT: - // case java.sql.Types.BLOB: - // case java.sql.Types.CLOB: - // case java.sql.Types.NCLOB: - // case java.sql.Types.DATALINK: - // case java.sql.Types.DISTINCT: - // case java.sql.Types.OTHER: - // case java.sql.Types.REF: - // case java.sql.Types.ROWID: - // case java.sql.Types.STRUCT: + try { + switch (columnTypes[pos]) { + case java.sql.Types.NULL: + reuse.setField(NullValue.getInstance(), pos); + break; + case java.sql.Types.BOOLEAN: + reuse.setField(resultSet.getBoolean(pos + 1), pos); + break; + case java.sql.Types.BIT: + reuse.setField(resultSet.getBoolean(pos + 1), pos); + break; + case java.sql.Types.CHAR: + reuse.setField(resultSet.getString(pos + 1), pos); + break; + case java.sql.Types.NCHAR: + reuse.setField(resultSet.getString(pos + 1), pos); + break; + case java.sql.Types.VARCHAR: + reuse.setField(resultSet.getString(pos + 1), pos); + break; + case java.sql.Types.LONGVARCHAR: + reuse.setField(resultSet.getString(pos + 1), pos); + break; + case java.sql.Types.LONGNVARCHAR: + reuse.setField(resultSet.getString(pos + 1), pos); + break; + case java.sql.Types.TINYINT: + reuse.setField(resultSet.getShort(pos + 1), pos); + break; + case java.sql.Types.SMALLINT: + reuse.setField(resultSet.getShort(pos + 1), pos); + break; + case java.sql.Types.BIGINT: + reuse.setField(resultSet.getLong(pos + 1), pos); + break; + case java.sql.Types.INTEGER: + reuse.setField(resultSet.getInt(pos + 1), pos); + break; + case java.sql.Types.FLOAT: + reuse.setField(resultSet.getDouble(pos + 1), pos); + break; + case java.sql.Types.REAL: + reuse.setField(resultSet.getFloat(pos + 1), pos); + break; + case java.sql.Types.DOUBLE: + reuse.setField(resultSet.getDouble(pos + 1), pos); + break; + case java.sql.Types.DECIMAL: + reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos); + break; + case java.sql.Types.NUMERIC: + reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos); + break; + case java.sql.Types.DATE: + reuse.setField(resultSet.getDate(pos + 1).toString(), pos); + break; + case java.sql.Types.TIME: + reuse.setField(resultSet.getTime(pos + 1).getTime(), pos); + break; + case java.sql.Types.TIMESTAMP: + reuse.setField(resultSet.getTimestamp(pos + 1).toString(), pos); + break; + case java.sql.Types.SQLXML: + reuse.setField(resultSet.getSQLXML(pos + 1).toString(), pos); + break; + default: + throw new SQLException("Unsupported sql-type [" + columnTypes[pos] + "] on column [" + pos + "]"); + + // case java.sql.Types.BINARY: + // case java.sql.Types.VARBINARY: + // case java.sql.Types.LONGVARBINARY: + // case java.sql.Types.ARRAY: + // case java.sql.Types.JAVA_OBJECT: + // case java.sql.Types.BLOB: + // case java.sql.Types.CLOB: + // case java.sql.Types.NCLOB: + // case java.sql.Types.DATALINK: + // case java.sql.Types.DISTINCT: + // case java.sql.Types.OTHER: + // case java.sql.Types.REF: + // case java.sql.Types.ROWID: + // case java.sql.Types.STRUCT: + } + } catch (NullPointerException npe) { + throw new IOException("Encountered null value for column " + pos + ". Decimal, Numeric, Date, Time, Timestamp and SQLXML columns may not contain NULL values."); } } }
