[FLINK-3472] [jdbc] Give a better exception if jdbc column has a null value

This closes #1772


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e719eddb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e719eddb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e719eddb

Branch: refs/heads/master
Commit: e719eddb9606351baccb45491f473563c2fe61fa
Parents: 2e78a36
Author: zentol <[email protected]>
Authored: Wed Mar 9 11:11:31 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Sun Mar 13 16:03:04 2016 +0100

----------------------------------------------------------------------
 .../flink/api/java/io/jdbc/JDBCInputFormat.java | 168 ++++++++++---------
 1 file changed, 86 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e719eddb/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 
b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index 132edc4..b764350 100644
--- 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ 
b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -191,89 +191,93 @@ public class JDBCInputFormat<OUT extends Tuple> extends 
RichInputFormat<OUT, Inp
         *
         * @param reuse Target Record.
         */
-       private void addValue(OUT reuse) throws SQLException {
+       private void addValue(OUT reuse) throws IOException, SQLException {
                for (int pos = 0; pos < columnTypes.length; pos++) {
-                       switch (columnTypes[pos]) {
-                               case java.sql.Types.NULL:
-                                       reuse.setField(NullValue.getInstance(), 
pos);
-                                       break;
-                               case java.sql.Types.BOOLEAN:
-                                       reuse.setField(resultSet.getBoolean(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.BIT:
-                                       reuse.setField(resultSet.getBoolean(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.CHAR:
-                                       reuse.setField(resultSet.getString(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.NCHAR:
-                                       reuse.setField(resultSet.getString(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.VARCHAR:
-                                       reuse.setField(resultSet.getString(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.LONGVARCHAR:
-                                       reuse.setField(resultSet.getString(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.LONGNVARCHAR:
-                                       reuse.setField(resultSet.getString(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.TINYINT:
-                                       reuse.setField(resultSet.getShort(pos + 
1), pos);
-                                       break;
-                               case java.sql.Types.SMALLINT:
-                                       reuse.setField(resultSet.getShort(pos + 
1), pos);
-                                       break;
-                               case java.sql.Types.BIGINT:
-                                       reuse.setField(resultSet.getLong(pos + 
1), pos);
-                                       break;
-                               case java.sql.Types.INTEGER:
-                                       reuse.setField(resultSet.getInt(pos + 
1), pos);
-                                       break;
-                               case java.sql.Types.FLOAT:
-                                       reuse.setField(resultSet.getDouble(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.REAL:
-                                       reuse.setField(resultSet.getFloat(pos + 
1), pos);
-                                       break;
-                               case java.sql.Types.DOUBLE:
-                                       reuse.setField(resultSet.getDouble(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.DECIMAL:
-                                       
reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
-                                       break;
-                               case java.sql.Types.NUMERIC:
-                                       
reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
-                                       break;
-                               case java.sql.Types.DATE:
-                                       reuse.setField(resultSet.getDate(pos + 
1).toString(), pos);
-                                       break;
-                               case java.sql.Types.TIME:
-                                       reuse.setField(resultSet.getTime(pos + 
1).getTime(), pos);
-                                       break;
-                               case java.sql.Types.TIMESTAMP:
-                                       
reuse.setField(resultSet.getTimestamp(pos + 1).toString(), pos);
-                                       break;
-                               case java.sql.Types.SQLXML:
-                                       reuse.setField(resultSet.getSQLXML(pos 
+ 1).toString(), pos);
-                                       break;
-                               default:
-                                       throw new SQLException("Unsupported 
sql-type [" + columnTypes[pos] + "] on column [" + pos + "]");
-
-                               // case java.sql.Types.BINARY:
-                               // case java.sql.Types.VARBINARY:
-                               // case java.sql.Types.LONGVARBINARY:
-                               // case java.sql.Types.ARRAY:
-                               // case java.sql.Types.JAVA_OBJECT:
-                               // case java.sql.Types.BLOB:
-                               // case java.sql.Types.CLOB:
-                               // case java.sql.Types.NCLOB:
-                               // case java.sql.Types.DATALINK:
-                               // case java.sql.Types.DISTINCT:
-                               // case java.sql.Types.OTHER:
-                               // case java.sql.Types.REF:
-                               // case java.sql.Types.ROWID:
-                               // case java.sql.Types.STRUCT:
+                       try {
+                               switch (columnTypes[pos]) {
+                                       case java.sql.Types.NULL:
+                                               
reuse.setField(NullValue.getInstance(), pos);
+                                               break;
+                                       case java.sql.Types.BOOLEAN:
+                                               
reuse.setField(resultSet.getBoolean(pos + 1), pos);
+                                               break;
+                                       case java.sql.Types.BIT:
+                                               
reuse.setField(resultSet.getBoolean(pos + 1), pos);
+                                               break;
+                                       case java.sql.Types.CHAR:
+                                               
reuse.setField(resultSet.getString(pos + 1), pos);
+                                               break;
+                                       case java.sql.Types.NCHAR:
+                                               
reuse.setField(resultSet.getString(pos + 1), pos);
+                                               break;
+                                       case java.sql.Types.VARCHAR:
+                                               
reuse.setField(resultSet.getString(pos + 1), pos);
+                                               break;
+                                       case java.sql.Types.LONGVARCHAR:
+                                               
reuse.setField(resultSet.getString(pos + 1), pos);
+                                               break;
+                                       case java.sql.Types.LONGNVARCHAR:
+                                               
reuse.setField(resultSet.getString(pos + 1), pos);
+                                               break;
+                                       case java.sql.Types.TINYINT:
+                                               
reuse.setField(resultSet.getShort(pos + 1), pos);
+                                               break;
+                                       case java.sql.Types.SMALLINT:
+                                               
reuse.setField(resultSet.getShort(pos + 1), pos);
+                                               break;
+                                       case java.sql.Types.BIGINT:
+                                               
reuse.setField(resultSet.getLong(pos + 1), pos);
+                                               break;
+                                       case java.sql.Types.INTEGER:
+                                               
reuse.setField(resultSet.getInt(pos + 1), pos);
+                                               break;
+                                       case java.sql.Types.FLOAT:
+                                               
reuse.setField(resultSet.getDouble(pos + 1), pos);
+                                               break;
+                                       case java.sql.Types.REAL:
+                                               
reuse.setField(resultSet.getFloat(pos + 1), pos);
+                                               break;
+                                       case java.sql.Types.DOUBLE:
+                                               
reuse.setField(resultSet.getDouble(pos + 1), pos);
+                                               break;
+                                       case java.sql.Types.DECIMAL:
+                                               
reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
+                                               break;
+                                       case java.sql.Types.NUMERIC:
+                                               
reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
+                                               break;
+                                       case java.sql.Types.DATE:
+                                               
reuse.setField(resultSet.getDate(pos + 1).toString(), pos);
+                                               break;
+                                       case java.sql.Types.TIME:
+                                               
reuse.setField(resultSet.getTime(pos + 1).getTime(), pos);
+                                               break;
+                                       case java.sql.Types.TIMESTAMP:
+                                               
reuse.setField(resultSet.getTimestamp(pos + 1).toString(), pos);
+                                               break;
+                                       case java.sql.Types.SQLXML:
+                                               
reuse.setField(resultSet.getSQLXML(pos + 1).toString(), pos);
+                                               break;
+                                       default:
+                                               throw new 
SQLException("Unsupported sql-type [" + columnTypes[pos] + "] on column [" + 
pos + "]");
+
+                                               // case java.sql.Types.BINARY:
+                                               // case 
java.sql.Types.VARBINARY:
+                                               // case 
java.sql.Types.LONGVARBINARY:
+                                               // case java.sql.Types.ARRAY:
+                                               // case 
java.sql.Types.JAVA_OBJECT:
+                                               // case java.sql.Types.BLOB:
+                                               // case java.sql.Types.CLOB:
+                                               // case java.sql.Types.NCLOB:
+                                               // case java.sql.Types.DATALINK:
+                                               // case java.sql.Types.DISTINCT:
+                                               // case java.sql.Types.OTHER:
+                                               // case java.sql.Types.REF:
+                                               // case java.sql.Types.ROWID:
+                                               // case java.sql.Types.STRUCT:
+                               }
+                       } catch (NullPointerException npe) {
+                               throw new IOException("Encountered null value 
for column " + pos + ". Decimal, Numeric, Date, Time, Timestamp and SQLXML 
columns may not contain NULL values.");
                        }
                }
        }

Reply via email to