snuyanzin commented on code in PR #18:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/18#discussion_r1098443002


##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java:
##########
@@ -111,39 +126,338 @@ class MySqlCatalogTestBase {
                     .primaryKeyNamed("PRIMARY", Lists.newArrayList("pid"))
                     .build();
 
-    public static final Map<String, MySQLContainer<?>> MYSQL_CONTAINERS = new 
HashMap<>();
-    public static final Map<String, MySqlCatalog> CATALOGS = new HashMap<>();
-
-    @BeforeAll
-    static void beforeAll() throws SQLException {
-        for (String dockerImageName : DOCKER_IMAGE_NAMES) {
-            MySQLContainer<?> container =
-                    new 
MySQLContainer<>(DockerImageName.parse(dockerImageName))
-                            .withUsername("root")
-                            .withPassword("")
-                            .withEnv(DEFAULT_CONTAINER_ENV_MAP)
-                            .withInitScript(MYSQL_INIT_SCRIPT)
-                            .withLogConsumer(new Slf4jLogConsumer(LOG));
-            container.start();
-            MYSQL_CONTAINERS.put(dockerImageName, container);
-            CATALOGS.put(
-                    dockerImageName,
-                    new MySqlCatalog(
-                            Thread.currentThread().getContextClassLoader(),
-                            TEST_CATALOG_NAME,
-                            TEST_DB,
-                            TEST_USERNAME,
-                            TEST_PWD,
-                            container
-                                    .getJdbcUrl()
-                                    .substring(0, 
container.getJdbcUrl().lastIndexOf("/"))));
-        }
-    }
-
-    @AfterAll
-    static void cleanup() {
-        for (MySQLContainer<?> container : MYSQL_CONTAINERS.values()) {
-            container.stop();
-        }
+    protected static final List<Row> TABLE_ROWS =
+            Lists.newArrayList(
+                    Row.ofKind(
+                            RowKind.INSERT,
+                            1L,
+                            -1L,
+                            new BigDecimal(1),
+                            null,
+                            true,
+                            null,
+                            "hello",
+                            Date.valueOf("2021-08-04").toLocalDate(),
+                            Timestamp.valueOf("2021-08-04 
01:54:16").toLocalDateTime(),
+                            new BigDecimal(-1),
+                            new BigDecimal(1),
+                            -1.0d,
+                            1.0d,
+                            "enum2",
+                            -9.1f,
+                            9.1f,
+                            -1,
+                            1L,
+                            -1,
+                            1L,
+                            null,
+                            "col_longtext",
+                            null,
+                            -1,
+                            1,
+                            "col_mediumtext",
+                            new BigDecimal(-99),
+                            new BigDecimal(99),
+                            -1.0d,
+                            1.0d,
+                            "set_ele1",
+                            Short.parseShort("-1"),
+                            1,
+                            "col_text",
+                            Time.valueOf("10:32:34").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 
01:54:16").toLocalDateTime(),
+                            "col_tinytext",
+                            Byte.parseByte("-1"),
+                            Short.parseShort("1"),
+                            null,
+                            "col_varchar",
+                            Timestamp.valueOf("2021-08-04 
01:54:16.463").toLocalDateTime(),
+                            Time.valueOf("09:33:43").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 
01:54:16.463").toLocalDateTime(),
+                            null),
+                    Row.ofKind(
+                            RowKind.INSERT,
+                            2L,
+                            -1L,
+                            new BigDecimal(1),
+                            null,
+                            true,
+                            null,
+                            "hello",
+                            Date.valueOf("2021-08-04").toLocalDate(),
+                            Timestamp.valueOf("2021-08-04 
01:53:19").toLocalDateTime(),
+                            new BigDecimal(-1),
+                            new BigDecimal(1),
+                            -1.0d,
+                            1.0d,
+                            "enum2",
+                            -9.1f,
+                            9.1f,
+                            -1,
+                            1L,
+                            -1,
+                            1L,
+                            null,
+                            "col_longtext",
+                            null,
+                            -1,
+                            1,
+                            "col_mediumtext",
+                            new BigDecimal(-99),
+                            new BigDecimal(99),
+                            -1.0d,
+                            1.0d,
+                            "set_ele1,set_ele12",
+                            Short.parseShort("-1"),
+                            1,
+                            "col_text",
+                            Time.valueOf("10:32:34").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 
01:53:19").toLocalDateTime(),
+                            "col_tinytext",
+                            Byte.parseByte("-1"),
+                            Short.parseShort("1"),
+                            null,
+                            "col_varchar",
+                            Timestamp.valueOf("2021-08-04 
01:53:19.098").toLocalDateTime(),
+                            Time.valueOf("09:33:43").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 
01:53:19.098").toLocalDateTime(),
+                            null));
+
+    private MySqlCatalog catalog;
+    private TableEnvironment tEnv;
+
+    protected static MySQLContainer<?> createContainer(String dockerImage) {
+        return new MySQLContainer<>(DockerImageName.parse(dockerImage))
+                .withUsername("root")
+                .withPassword("")
+                .withEnv(DEFAULT_CONTAINER_ENV_MAP)
+                .withInitScript(MYSQL_INIT_SCRIPT)
+                .withLogConsumer(new Slf4jLogConsumer(LOG));
+    }
+
+    protected abstract String getDatabaseUrl();
+
+    @BeforeEach
+    void setup() {
+        catalog =
+                new MySqlCatalog(
+                        Thread.currentThread().getContextClassLoader(),
+                        TEST_CATALOG_NAME,
+                        TEST_DB,
+                        TEST_USERNAME,
+                        TEST_PWD,
+                        getDatabaseUrl());
+
+        this.tEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+        // Use mysql catalog.
+        tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
+        tEnv.useCatalog(TEST_CATALOG_NAME);
+    }
+
+    @Test
+    void testGetDb_DatabaseNotExistException() throws Exception {
+        String databaseNotExist = "nonexistent";
+        assertThatThrownBy(() -> catalog.getDatabase(databaseNotExist))
+                .satisfies(
+                        anyCauseMatches(
+                                DatabaseNotExistException.class,
+                                String.format(
+                                        "Database %s does not exist in 
Catalog",
+                                        databaseNotExist)));
+    }
+
+    @Test
+    void testListDatabases() {
+        List<String> actual = catalog.listDatabases();
+        assertThat(actual).containsExactly(TEST_DB, TEST_DB2);
+    }
+
+    @Test
+    void testDbExists() throws Exception {
+        String databaseNotExist = "nonexistent";
+        assertThat(catalog.databaseExists(databaseNotExist)).isFalse();
+        assertThat(catalog.databaseExists(TEST_DB)).isTrue();
+    }
+
+    // ------ tables ------
+
+    @Test
+    void testListTables() throws DatabaseNotExistException {
+        List<String> actual = catalog.listTables(TEST_DB);
+        assertThat(actual)
+                .isEqualTo(
+                        Arrays.asList(
+                                TEST_TABLE_ALL_TYPES,
+                                TEST_SINK_TABLE_ALL_TYPES,
+                                TEST_TABLE_SINK_FROM_GROUPED_BY,
+                                TEST_TABLE_PK));
+    }
+
+    @Test
+    void testListTables_DatabaseNotExistException() throws 
DatabaseNotExistException {
+        String anyDatabase = "anyDatabase";
+        assertThatThrownBy(() -> catalog.listTables(anyDatabase))
+                .satisfies(
+                        anyCauseMatches(
+                                DatabaseNotExistException.class,
+                                String.format(
+                                        "Database %s does not exist in 
Catalog", anyDatabase)));
+    }
+
+    @Test
+    void testTableExists() {
+        String tableNotExist = "nonexist";
+        assertThat(catalog.tableExists(new ObjectPath(TEST_DB, 
tableNotExist))).isFalse();
+        assertThat(catalog.tableExists(new ObjectPath(TEST_DB, 
TEST_TABLE_ALL_TYPES))).isTrue();
+    }
+
+    @Test
+    void testGetTables_TableNotExistException() throws TableNotExistException {
+        String anyTableNotExist = "anyTable";
+        assertThatThrownBy(() -> catalog.getTable(new ObjectPath(TEST_DB, 
anyTableNotExist)))
+                .satisfies(
+                        anyCauseMatches(
+                                TableNotExistException.class,
+                                String.format(
+                                        "Table (or view) %s.%s does not exist 
in Catalog",
+                                        TEST_DB, anyTableNotExist)));
+    }
+
+    @Test
+    void testGetTables_TableNotExistException_NoDb() throws 
TableNotExistException {
+        String databaseNotExist = "nonexistdb";
+        String tableNotExist = "anyTable";
+        assertThatThrownBy(() -> catalog.getTable(new 
ObjectPath(databaseNotExist, tableNotExist)))
+                .satisfies(
+                        anyCauseMatches(
+                                TableNotExistException.class,
+                                String.format(
+                                        "Table (or view) %s.%s does not exist 
in Catalog",
+                                        databaseNotExist, tableNotExist)));
+    }
+
+    @Test
+    void testGetTable() throws TableNotExistException {
+        CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB, 
TEST_TABLE_ALL_TYPES));
+        assertThat(table.getUnresolvedSchema()).isEqualTo(TABLE_SCHEMA);
+    }
+
+    @Test
+    void testGetTablePrimaryKey() throws TableNotExistException {
+        // test the PK of test.t_user
+        Schema tableSchemaTestPK1 =
+                Schema.newBuilder()
+                        .column("uid", DataTypes.BIGINT().notNull())
+                        .column("col_bigint", DataTypes.BIGINT())
+                        .primaryKeyNamed("PRIMARY", 
Collections.singletonList("uid"))
+                        .build();
+        CatalogBaseTable tablePK1 = catalog.getTable(new ObjectPath(TEST_DB, 
TEST_TABLE_PK));
+        assertThat(tableSchemaTestPK1.getPrimaryKey().get())
+                
.isEqualTo(tablePK1.getUnresolvedSchema().getPrimaryKey().get());

Review Comment:
   ```suggestion
           
assertThat(tableSchemaTestPK1.getPrimaryKey()).contains(tablePK1.getUnresolvedSchema().getPrimaryKey().get());
   ```
   nit: there is contains for optional in assertj



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to