fpompermaier commented on a change in pull request #11906:
URL: https://github.com/apache/flink/pull/11906#discussion_r427972573



##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -126,31 +124,33 @@ public String getBaseUrl() {
 
        // ------ retrieve PK constraint ------
 
-       protected UniqueConstraint getPrimaryKey(DatabaseMetaData metaData, 
String schema, String table) throws SQLException {
+       protected Optional<UniqueConstraint> getPrimaryKey(DatabaseMetaData 
metaData, String schema, String table) throws SQLException {
 
                // According to the Javadoc of 
java.sql.DatabaseMetaData#getPrimaryKeys,
                // the returned primary key columns are ordered by COLUMN_NAME, 
not by KEY_SEQ.
                // We need to sort them based on the KEY_SEQ value.
                ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
 
-               List<Map.Entry<Integer, String>> columnsWithIndex = null;
+               Map<Integer, String> keySeqColumnName = new HashMap<>();
                String pkName = null;
-               while (rs.next()) {
+               while (rs.next())  {
                        String columnName = rs.getString("COLUMN_NAME");
-                       pkName = rs.getString("PK_NAME");
+                       pkName = rs.getString("PK_NAME"); // all the PK_NAME 
should be the same
                        int keySeq = rs.getInt("KEY_SEQ");
-                       if (columnsWithIndex == null) {
-                               columnsWithIndex = new ArrayList<>();
-                       }
-                       columnsWithIndex.add(new 
AbstractMap.SimpleEntry<>(Integer.valueOf(keySeq), columnName));
+                       keySeqColumnName.put(keySeq - 1, columnName); // 
KEY_SEQ is 1-based index
                }
-               if (columnsWithIndex != null) {
-                       // sort columns by KEY_SEQ
-                       
columnsWithIndex.sort(Comparator.comparingInt(Map.Entry::getKey));
-                       List<String> cols = 
columnsWithIndex.stream().map(Map.Entry::getValue).collect(Collectors.toList());
-                       return UniqueConstraint.primaryKey(pkName, cols);
+               List<String> pkFields = Arrays.asList(new 
String[keySeqColumnName.size()]); // initialize size
+               keySeqColumnName.forEach(pkFields::set);

Review comment:
       very neat and brilliant pattern..I learned something new :+1: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to