leonardBang commented on code in PR #19741:
URL: https://github.com/apache/flink/pull/19741#discussion_r897818461


##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java:
##########
@@ -251,6 +267,21 @@ public void testGetTable() throws TableNotExistException {
         assertEquals(TABLE_SCHEMA, table.getUnresolvedSchema());
     }
 
+    @Test
+    public void testGetTablePrimaryKey() throws TableNotExistException {
+        Schema tableSchemaUser =

Review Comment:
   IIUC, we need another table in another database with same name 'user' and 
different pk 'another_pk'  to validate your PR works



##########
flink-connectors/flink-connector-jdbc/src/test/resources/mysql-scripts/catalog-init-for-test.sql:
##########
@@ -59,7 +58,7 @@ CREATE TABLE `t_all_types` (
   `col_int_unsigned` int(10) unsigned DEFAULT NULL,
   `col_integer` int(11) DEFAULT NULL,
   `col_integer_unsigned` int(10) unsigned DEFAULT NULL,
-  `col_json` json DEFAULT NULL,
+  `col_json` longtext DEFAULT NULL,

Review Comment:
   The column name `col_json` and type `longtext` is not matched, we can skip 
the json type test by removing this field. Although this is a minor test 
fallback, I think it's okay as we did not cover all types like GEO type too.



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -144,19 +144,25 @@ public String getBaseUrl() {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws 
SQLException {
+            DatabaseMetaData metaData, String database, String schema, String 
table)
+            throws SQLException {
 
         // According to the Javadoc of 
java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by 
KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+        // In the currently supported databases, database is equivalent to 
catalog.
+        // We need to pass the name of the database to represent catalog.
+        ResultSet rs = metaData.getPrimaryKeys(database, schema, table);
 
         Map<Integer, String> keySeqColumnName = new HashMap<>();
         String pkName = null;
         while (rs.next()) {
             String columnName = rs.getString("COLUMN_NAME");
             pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the 
same
             int keySeq = rs.getInt("KEY_SEQ");
+            Preconditions.checkState(
+                    !keySeqColumnName.containsKey(keySeq - 1),
+                    "The PK must be from the same table.");

Review Comment:
   The field(s) of primary key must be from the same table.



##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java:
##########
@@ -106,23 +116,51 @@ public class MySqlCatalogTestBase {
                     .primaryKeyNamed("PRIMARY", Lists.newArrayList("pid"))
                     .build();
 
-    @ClassRule
-    public static final MySQLContainer<?> MYSQL_CONTAINER =
-            new MySQLContainer<>(MYSQL_57_IMAGE)
-                    .withUsername("root")
-                    .withPassword("")
-                    .withEnv(DEFAULT_CONTAINER_ENV_MAP)
-                    .withInitScript(MYSQL_INIT_SCRIPT)
-                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+    public static final Map<String, MySQLContainer<?>> MYSQL_CONTAINERS = new 
HashMap<>();
 
-    protected static MySqlCatalog catalog;
+    public static final Map<String, MySqlCatalog> CATALOGS = new HashMap<>();
 
     @BeforeClass
-    public static void beforeAll() {
-        String baseUrl =
-                MYSQL_CONTAINER
-                        .getJdbcUrl()
-                        .substring(0, 
MYSQL_CONTAINER.getJdbcUrl().lastIndexOf("/"));
-        catalog = new MySqlCatalog(TEST_CATALOG_NAME, TEST_DB, TEST_USERNAME, 
TEST_PWD, baseUrl);
+    public static void beforeAll() throws SQLException {
+        for (String dockerImageName : DOCKER_IMAGE_NAMES) {
+            MySQLContainer<?> container =
+                    new 
MySQLContainer<>(DockerImageName.parse(dockerImageName))
+                            .withUsername("root")
+                            .withPassword("")
+                            .withEnv(DEFAULT_CONTAINER_ENV_MAP)
+                            .withInitScript(MYSQL_INIT_SCRIPT)
+                            .withLogConsumer(new Slf4jLogConsumer(LOG));
+            container.start();
+            MYSQL_CONTAINERS.put(dockerImageName, container);
+
+            String baseUrl =
+                    container.getJdbcUrl().substring(0, 
container.getJdbcUrl().lastIndexOf("/"));
+
+            CATALOGS.put(
+                    dockerImageName,
+                    new MySqlCatalog(TEST_CATALOG_NAME, TEST_DB, 
TEST_USERNAME, TEST_PWD, baseUrl));
+
+            try (Connection conn = DriverManager.getConnection(baseUrl, 
TEST_USERNAME, TEST_PWD);
+                    Statement stat = conn.createStatement()) {
+                if (!dockerImageName.equals("mysql:5.6.51")) {
+                    // The tables ddl according to 5.6.x,
+                    // we need to modify the type of the field col_json to 
json in 5.7.x and 8.0.x.

Review Comment:
   Let’s remove the json type cover for simplification.



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -144,19 +144,25 @@ public String getBaseUrl() {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws 
SQLException {
+            DatabaseMetaData metaData, String database, String schema, String 
table)
+            throws SQLException {
 
         // According to the Javadoc of 
java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by 
KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+        // In the currently supported databases, database is equivalent to 
catalog.

Review Comment:
    // In the currently supported database dialects MySQL and Postgres, the 
database term is equivalent to catalog term. We need to pass the database name 
as catalog parameter for retrieving primary keys by full table identifier  



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java:
##########
@@ -169,6 +169,11 @@ protected String getSchemaName(ObjectPath tablePath) {
         return 
PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgSchemaName();
     }
 
+    @Override
+    protected String getDatabaseName(ObjectPath tablePath) {
+        return 
PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgSchemaName();

Review Comment:
   Please add a test for this, I remember we use 'catalog.db.\`schema.table\`' 
in Flink for Postgres table representation, the databaseName is diff with 
schemaName.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to