This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 0611b93d [Improvment](cdc) Use uniq index as primary key (#516)
0611b93d is described below

commit 0611b93d0218c6ba9d547c50a23f910a0bbe1cf9
Author: wudi <[email protected]>
AuthorDate: Mon Nov 25 10:45:51 2024 +0800

    [Improvment](cdc) Use uniq index as primary key (#516)
---
 .../doris/flink/tools/cdc/JdbcSourceSchema.java    | 33 ++++++++++++++++++++++
 1 file changed, 33 insertions(+)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
index 31cfd1cb..2547b976 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
@@ -27,8 +27,10 @@ import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * JdbcSourceSchema is a subclass of SourceSchema, used to build metadata 
about jdbc-related
@@ -47,6 +49,10 @@ public abstract class JdbcSourceSchema extends SourceSchema {
         super(databaseName, schemaName, tableName, tableComment);
         fields = getColumnInfo(metaData, databaseName, schemaName, tableName);
         primaryKeys = getPrimaryKeys(metaData, databaseName, schemaName, 
tableName);
+        if (primaryKeys.isEmpty()) {
+            List<String> uniqIndex = getUniqIndex(metaData, databaseName, 
schemaName, tableName);
+            primaryKeys.addAll(uniqIndex);
+        }
     }
 
     public LinkedHashMap<String, FieldSchema> getColumnInfo(
@@ -96,5 +102,32 @@ public abstract class JdbcSourceSchema extends SourceSchema 
{
         return primaryKeys;
     }
 
+    /**
+     * Get the unique index of the table If the primary key is empty but there 
is a uniq key, then
+     * use the uniqkey instead of the primarykey
+     */
+    public List<String> getUniqIndex(
+            DatabaseMetaData metaData, String databaseName, String schemaName, 
String tableName)
+            throws SQLException {
+        Map<String, List<String>> uniqIndexMap = new HashMap<>();
+        String firstIndexName = null;
+        try (ResultSet rs =
+                metaData.getIndexInfo(databaseName, schemaName, tableName, 
true, true)) {
+            while (rs.next()) {
+                String columnName = rs.getString("COLUMN_NAME");
+                String indexName = rs.getString("INDEX_NAME");
+                if (firstIndexName == null) {
+                    firstIndexName = indexName;
+                }
+                uniqIndexMap.computeIfAbsent(indexName, k -> new 
ArrayList<>()).add(columnName);
+            }
+        }
+        if (!uniqIndexMap.isEmpty()) {
+            // If there are multiple uniq indices, return one
+            return uniqIndexMap.get(firstIndexName);
+        }
+        return new ArrayList<>();
+    }
+
     public abstract String convertToDorisType(String fieldType, Integer 
precision, Integer scale);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to