This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 82667a8  [FLINK-13490][jdbc] Fix return null in 
JDBCUtils::getFieldFromResultSet
82667a8 is described below

commit 82667a816f31d20da93eb4464d01fc9fe2fb1bc5
Author: TsReaper <[email protected]>
AuthorDate: Tue Jul 30 08:41:08 2019 +0800

    [FLINK-13490][jdbc] Fix return null in JDBCUtils::getFieldFromResultSet
---
 .../apache/flink/api/java/io/jdbc/JDBCUtils.java   | 59 ++++++++++++++--------
 .../api/java/io/jdbc/JDBCLookupFunctionITCase.java | 33 +++++++++---
 2 files changed, 63 insertions(+), 29 deletions(-)

diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
index 9bc948a..d50b82e 100644
--- 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUtils.java
@@ -152,53 +152,64 @@ public class JDBCUtils {
        }
 
        public static Object getFieldFromResultSet(int index, int type, 
ResultSet set) throws SQLException {
-               if (set.wasNull()) {
-                       return null;
-               }
-
+               Object ret;
                switch (type) {
                        case java.sql.Types.NULL:
-                               return null;
+                               ret = null;
+                               break;
                        case java.sql.Types.BOOLEAN:
                        case java.sql.Types.BIT:
-                               return set.getBoolean(index + 1);
+                               ret = set.getBoolean(index + 1);
+                               break;
                        case java.sql.Types.CHAR:
                        case java.sql.Types.NCHAR:
                        case java.sql.Types.VARCHAR:
                        case java.sql.Types.LONGVARCHAR:
                        case java.sql.Types.LONGNVARCHAR:
-                               return set.getString(index + 1);
+                               ret = set.getString(index + 1);
+                               break;
                        case java.sql.Types.TINYINT:
-                               return set.getByte(index + 1);
+                               ret = set.getByte(index + 1);
+                               break;
                        case java.sql.Types.SMALLINT:
-                               return set.getShort(index + 1);
+                               ret = set.getShort(index + 1);
+                               break;
                        case java.sql.Types.INTEGER:
-                               return set.getInt(index + 1);
+                               ret = set.getInt(index + 1);
+                               break;
                        case java.sql.Types.BIGINT:
-                               return set.getLong(index + 1);
+                               ret = set.getLong(index + 1);
+                               break;
                        case java.sql.Types.REAL:
-                               return set.getFloat(index + 1);
+                               ret = set.getFloat(index + 1);
+                               break;
                        case java.sql.Types.FLOAT:
                        case java.sql.Types.DOUBLE:
-                               return set.getDouble(index + 1);
+                               ret = set.getDouble(index + 1);
+                               break;
                        case java.sql.Types.DECIMAL:
                        case java.sql.Types.NUMERIC:
-                               return set.getBigDecimal(index + 1);
+                               ret = set.getBigDecimal(index + 1);
+                               break;
                        case java.sql.Types.DATE:
-                               return set.getDate(index + 1);
+                               ret = set.getDate(index + 1);
+                               break;
                        case java.sql.Types.TIME:
-                               return set.getTime(index + 1);
+                               ret = set.getTime(index + 1);
+                               break;
                        case java.sql.Types.TIMESTAMP:
-                               return set.getTimestamp(index + 1);
+                               ret = set.getTimestamp(index + 1);
+                               break;
                        case java.sql.Types.BINARY:
                        case java.sql.Types.VARBINARY:
                        case java.sql.Types.LONGVARBINARY:
-                               return set.getBytes(index + 1);
+                               ret = set.getBytes(index + 1);
+                               break;
                        default:
-                               Object value = set.getObject(index + 1);
+                               ret = set.getObject(index + 1);
                                LOG.warn("Unmanaged sql type ({}) for column 
{}. Best effort approach to get its value: {}.",
-                                               type, index + 1, value);
-                               return value;
+                                       type, index + 1, ret);
+                               break;
 
                        // case java.sql.Types.SQLXML
                        // case java.sql.Types.ARRAY:
@@ -213,5 +224,11 @@ public class JDBCUtils {
                        // case java.sql.Types.ROWID:
                        // case java.sql.Types.STRUC
                }
+
+               if (set.wasNull()) {
+                       return null;
+               } else {
+                       return ret;
+               }
        }
 }
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java
index 3d66001..16a0eeb 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java
@@ -84,20 +84,37 @@ public class JDBCLookupFunctionITCase extends 
AbstractTestBase {
                        Object[][] data = new Object[][] {
                                        new Object[] {1, 1, "11-c1-v1", 
"11-c2-v1"},
                                        new Object[] {1, 1, "11-c1-v2", 
"11-c2-v2"},
-                                       new Object[] {2, 3, "23-c1", "23-c2"},
+                                       new Object[] {2, 3, null, "23-c2"},
                                        new Object[] {2, 5, "25-c1", "25-c2"},
                                        new Object[] {3, 8, "38-c1", "38-c2"}
                        };
+                       boolean[] surroundedByQuotes = new boolean[] {
+                               false, false, true, true
+                       };
+
                        StringBuilder sqlQueryBuilder = new StringBuilder(
                                        "INSERT INTO " + LOOKUP_TABLE + " (id1, 
id2, comment1, comment2) VALUES ");
                        for (int i = 0; i < data.length; i++) {
-                               sqlQueryBuilder.append("(")
-                                               .append(data[i][0]).append(",")
-                                               .append(data[i][1]).append(",'")
-                                               
.append(data[i][2]).append("','")
-                                               
.append(data[i][3]).append("')");
+                               sqlQueryBuilder.append("(");
+                               for (int j = 0; j < data[i].length; j++) {
+                                       if (data[i][j] == null) {
+                                               sqlQueryBuilder.append("null");
+                                       } else {
+                                               if (surroundedByQuotes[j]) {
+                                                       
sqlQueryBuilder.append("'");
+                                               }
+                                               
sqlQueryBuilder.append(data[i][j]);
+                                               if (surroundedByQuotes[j]) {
+                                                       
sqlQueryBuilder.append("'");
+                                               }
+                                       }
+                                       if (j < data[i].length - 1) {
+                                               sqlQueryBuilder.append(", ");
+                                       }
+                               }
+                               sqlQueryBuilder.append(")");
                                if (i < data.length - 1) {
-                                       sqlQueryBuilder.append(",");
+                                       sqlQueryBuilder.append(", ");
                                }
                        }
                        stat.execute(sqlQueryBuilder.toString());
@@ -160,7 +177,7 @@ public class JDBCLookupFunctionITCase extends 
AbstractTestBase {
                expected.add("1,1,11-c1-v1,11-c2-v1");
                expected.add("1,1,11-c1-v2,11-c2-v2");
                expected.add("1,1,11-c1-v2,11-c2-v2");
-               expected.add("2,3,23-c1,23-c2");
+               expected.add("2,3,null,23-c2");
                expected.add("2,5,25-c1,25-c2");
                expected.add("3,8,38-c1,38-c2");
 

Reply via email to