snuyanzin commented on code in PR #18:
URL:
https://github.com/apache/flink-connector-jdbc/pull/18#discussion_r1098441527
##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java:
##########
@@ -111,39 +126,338 @@ class MySqlCatalogTestBase {
.primaryKeyNamed("PRIMARY", Lists.newArrayList("pid"))
.build();
- public static final Map<String, MySQLContainer<?>> MYSQL_CONTAINERS = new
HashMap<>();
- public static final Map<String, MySqlCatalog> CATALOGS = new HashMap<>();
-
- @BeforeAll
- static void beforeAll() throws SQLException {
- for (String dockerImageName : DOCKER_IMAGE_NAMES) {
- MySQLContainer<?> container =
- new
MySQLContainer<>(DockerImageName.parse(dockerImageName))
- .withUsername("root")
- .withPassword("")
- .withEnv(DEFAULT_CONTAINER_ENV_MAP)
- .withInitScript(MYSQL_INIT_SCRIPT)
- .withLogConsumer(new Slf4jLogConsumer(LOG));
- container.start();
- MYSQL_CONTAINERS.put(dockerImageName, container);
- CATALOGS.put(
- dockerImageName,
- new MySqlCatalog(
- Thread.currentThread().getContextClassLoader(),
- TEST_CATALOG_NAME,
- TEST_DB,
- TEST_USERNAME,
- TEST_PWD,
- container
- .getJdbcUrl()
- .substring(0,
container.getJdbcUrl().lastIndexOf("/"))));
- }
- }
-
- @AfterAll
- static void cleanup() {
- for (MySQLContainer<?> container : MYSQL_CONTAINERS.values()) {
- container.stop();
- }
+ protected static final List<Row> TABLE_ROWS =
+ Lists.newArrayList(
+ Row.ofKind(
+ RowKind.INSERT,
+ 1L,
+ -1L,
+ new BigDecimal(1),
+ null,
+ true,
+ null,
+ "hello",
+ Date.valueOf("2021-08-04").toLocalDate(),
+ Timestamp.valueOf("2021-08-04
01:54:16").toLocalDateTime(),
+ new BigDecimal(-1),
+ new BigDecimal(1),
+ -1.0d,
+ 1.0d,
+ "enum2",
+ -9.1f,
+ 9.1f,
+ -1,
+ 1L,
+ -1,
+ 1L,
+ null,
+ "col_longtext",
+ null,
+ -1,
+ 1,
+ "col_mediumtext",
+ new BigDecimal(-99),
+ new BigDecimal(99),
+ -1.0d,
+ 1.0d,
+ "set_ele1",
+ Short.parseShort("-1"),
+ 1,
+ "col_text",
+ Time.valueOf("10:32:34").toLocalTime(),
+ Timestamp.valueOf("2021-08-04
01:54:16").toLocalDateTime(),
+ "col_tinytext",
+ Byte.parseByte("-1"),
+ Short.parseShort("1"),
+ null,
+ "col_varchar",
+ Timestamp.valueOf("2021-08-04
01:54:16.463").toLocalDateTime(),
+ Time.valueOf("09:33:43").toLocalTime(),
+ Timestamp.valueOf("2021-08-04
01:54:16.463").toLocalDateTime(),
+ null),
+ Row.ofKind(
+ RowKind.INSERT,
+ 2L,
+ -1L,
+ new BigDecimal(1),
+ null,
+ true,
+ null,
+ "hello",
+ Date.valueOf("2021-08-04").toLocalDate(),
+ Timestamp.valueOf("2021-08-04
01:53:19").toLocalDateTime(),
+ new BigDecimal(-1),
+ new BigDecimal(1),
+ -1.0d,
+ 1.0d,
+ "enum2",
+ -9.1f,
+ 9.1f,
+ -1,
+ 1L,
+ -1,
+ 1L,
+ null,
+ "col_longtext",
+ null,
+ -1,
+ 1,
+ "col_mediumtext",
+ new BigDecimal(-99),
+ new BigDecimal(99),
+ -1.0d,
+ 1.0d,
+ "set_ele1,set_ele12",
+ Short.parseShort("-1"),
+ 1,
+ "col_text",
+ Time.valueOf("10:32:34").toLocalTime(),
+ Timestamp.valueOf("2021-08-04
01:53:19").toLocalDateTime(),
+ "col_tinytext",
+ Byte.parseByte("-1"),
+ Short.parseShort("1"),
+ null,
+ "col_varchar",
+ Timestamp.valueOf("2021-08-04
01:53:19.098").toLocalDateTime(),
+ Time.valueOf("09:33:43").toLocalTime(),
+ Timestamp.valueOf("2021-08-04
01:53:19.098").toLocalDateTime(),
+ null));
+
+ private MySqlCatalog catalog;
+ private TableEnvironment tEnv;
+
+ protected static MySQLContainer<?> createContainer(String dockerImage) {
+ return new MySQLContainer<>(DockerImageName.parse(dockerImage))
+ .withUsername("root")
+ .withPassword("")
+ .withEnv(DEFAULT_CONTAINER_ENV_MAP)
+ .withInitScript(MYSQL_INIT_SCRIPT)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ }
+
+ protected abstract String getDatabaseUrl();
+
+ @BeforeEach
+ void setup() {
+ catalog =
+ new MySqlCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ TEST_DB,
+ TEST_USERNAME,
+ TEST_PWD,
+ getDatabaseUrl());
+
+ this.tEnv =
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+ tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+ // Use mysql catalog.
+ tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
+ tEnv.useCatalog(TEST_CATALOG_NAME);
+ }
+
+ @Test
+ void testGetDb_DatabaseNotExistException() throws Exception {
+ String databaseNotExist = "nonexistent";
+ assertThatThrownBy(() -> catalog.getDatabase(databaseNotExist))
+ .satisfies(
+ anyCauseMatches(
+ DatabaseNotExistException.class,
+ String.format(
+ "Database %s does not exist in
Catalog",
+ databaseNotExist)));
+ }
+
+ @Test
+ void testListDatabases() {
+ List<String> actual = catalog.listDatabases();
+ assertThat(actual).containsExactly(TEST_DB, TEST_DB2);
+ }
+
+ @Test
+ void testDbExists() throws Exception {
+ String databaseNotExist = "nonexistent";
+ assertThat(catalog.databaseExists(databaseNotExist)).isFalse();
+ assertThat(catalog.databaseExists(TEST_DB)).isTrue();
+ }
+
+ // ------ tables ------
+
+ @Test
+ void testListTables() throws DatabaseNotExistException {
+ List<String> actual = catalog.listTables(TEST_DB);
+ assertThat(actual)
+ .isEqualTo(
+ Arrays.asList(
+ TEST_TABLE_ALL_TYPES,
+ TEST_SINK_TABLE_ALL_TYPES,
+ TEST_TABLE_SINK_FROM_GROUPED_BY,
+ TEST_TABLE_PK));
+ }
+
+ @Test
+ void testListTables_DatabaseNotExistException() throws
DatabaseNotExistException {
+ String anyDatabase = "anyDatabase";
+ assertThatThrownBy(() -> catalog.listTables(anyDatabase))
+ .satisfies(
+ anyCauseMatches(
+ DatabaseNotExistException.class,
+ String.format(
+ "Database %s does not exist in
Catalog", anyDatabase)));
+ }
+
+ @Test
+ void testTableExists() {
+ String tableNotExist = "nonexist";
+ assertThat(catalog.tableExists(new ObjectPath(TEST_DB,
tableNotExist))).isFalse();
+ assertThat(catalog.tableExists(new ObjectPath(TEST_DB,
TEST_TABLE_ALL_TYPES))).isTrue();
+ }
+
+ @Test
+ void testGetTables_TableNotExistException() throws TableNotExistException {
+ String anyTableNotExist = "anyTable";
+ assertThatThrownBy(() -> catalog.getTable(new ObjectPath(TEST_DB,
anyTableNotExist)))
+ .satisfies(
+ anyCauseMatches(
+ TableNotExistException.class,
+ String.format(
+ "Table (or view) %s.%s does not exist
in Catalog",
+ TEST_DB, anyTableNotExist)));
+ }
+
+ @Test
+ void testGetTables_TableNotExistException_NoDb() throws
TableNotExistException {
+ String databaseNotExist = "nonexistdb";
+ String tableNotExist = "anyTable";
+ assertThatThrownBy(() -> catalog.getTable(new
ObjectPath(databaseNotExist, tableNotExist)))
+ .satisfies(
+ anyCauseMatches(
+ TableNotExistException.class,
+ String.format(
+ "Table (or view) %s.%s does not exist
in Catalog",
+ databaseNotExist, tableNotExist)));
+ }
+
+ @Test
+ void testGetTable() throws TableNotExistException {
+ CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB,
TEST_TABLE_ALL_TYPES));
+ assertThat(table.getUnresolvedSchema()).isEqualTo(TABLE_SCHEMA);
+ }
+
+ @Test
+ void testGetTablePrimaryKey() throws TableNotExistException {
+ // test the PK of test.t_user
+ Schema tableSchemaTestPK1 =
+ Schema.newBuilder()
+ .column("uid", DataTypes.BIGINT().notNull())
+ .column("col_bigint", DataTypes.BIGINT())
+ .primaryKeyNamed("PRIMARY",
Collections.singletonList("uid"))
+ .build();
+ CatalogBaseTable tablePK1 = catalog.getTable(new ObjectPath(TEST_DB,
TEST_TABLE_PK));
+ assertThat(tableSchemaTestPK1.getPrimaryKey().get())
+
.isEqualTo(tablePK1.getUnresolvedSchema().getPrimaryKey().get());
+
+ // test the PK of test2.t_user
+ Schema tableSchemaTestPK2 =
+ Schema.newBuilder()
+ .column("pid", DataTypes.INT().notNull())
+ .column("col_varchar", DataTypes.VARCHAR(255))
+ .primaryKeyNamed("PRIMARY",
Collections.singletonList("pid"))
+ .build();
+ CatalogBaseTable tablePK2 = catalog.getTable(new ObjectPath(TEST_DB2,
TEST_TABLE_PK));
+ assertThat(tableSchemaTestPK2.getPrimaryKey().get())
+
.isEqualTo(tablePK2.getUnresolvedSchema().getPrimaryKey().get());
Review Comment:
```suggestion
assertThat(tableSchemaTestPK2.getPrimaryKey()).contains(tablePK2.getUnresolvedSchema().getPrimaryKey().get());
```
nit: there is contains for optional in assertj
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]