wuchong commented on a change in pull request #11663: [FLINK-16820][jdbc] support reading timestamp, data, and time in JDBCTableSource URL: https://github.com/apache/flink/pull/11663#discussion_r405944872
########## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/AbstractJDBCRowConverter.java ########## @@ -32,18 +34,56 @@ public abstract class AbstractJDBCRowConverter implements JDBCRowConverter { protected final RowType rowType; + protected final JDBCFieldConverter[] converters; public AbstractJDBCRowConverter(RowType rowType) { this.rowType = checkNotNull(rowType); + converters = new JDBCFieldConverter[rowType.getFieldCount()]; + + for (int i = 0; i < converters.length; i++) { + converters[i] = createConverter(rowType.getTypeAt(i)); + } } @Override public Row convert(ResultSet resultSet, Row reuse) throws SQLException { for (int pos = 0; pos < rowType.getFieldCount(); pos++) { - Object v = resultSet.getObject(pos + 1); - reuse.setField(pos, v); + reuse.setField(pos, converters[pos].convert(resultSet.getObject(pos + 1))); } return reuse; } + + private JDBCFieldConverter createConverter(LogicalType type) { + LogicalTypeRoot root = type.getTypeRoot(); + + if (root == LogicalTypeRoot.SMALLINT) { + return new SmallJDBCFieldConverter(); + } else { + return new DefaultJDBCFieldConverter(); + } Review comment: Can be simplified to Java lambda: ```java if (root == LogicalTypeRoot.SMALLINT) { // Converter for smallint type that casts value to int and then return short value, // since JDBC 1.0 use int type for smallint values. return value -> ((Integer) value).shortValue(); } return value -> value; ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services