slinkydeveloper commented on a change in pull request #18653:
URL: https://github.com/apache/flink/pull/18653#discussion_r803396778



##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -43,134 +43,104 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.table.api.Expressions.row;
-import static org.junit.Assert.assertEquals;
+import java.util.Map;
 
 /**
- * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use
- * MariaDB to mock a DB which use mysql driver too.
+ * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use MySQL
+ * to mock a DB.
  */
 public class UnsignedTypeConversionITCase extends AbstractTestBase {
 
-    private static final Logger logger =
+    private static final Logger LOGGER =
             LoggerFactory.getLogger(UnsignedTypeConversionITCase.class);
+
+    private static final DockerImageName MYSQL_57_IMAGE = 
DockerImageName.parse("mysql:5.7.34");
     private static final String DEFAULT_DB_NAME = "test";
     private static final String TABLE_NAME = "unsigned_test";
-    private static final int INITIALIZE_DB_MAX_RETRY = 3;
-    private static DB db;
-    private static String dbUrl;
-    private static Connection connection;
-
-    private StreamTableEnvironment tEnv;
-
-    @BeforeClass
-    public static void prepareMariaDB() throws IllegalStateException {
-        boolean initDbSuccess = false;
-        int i = 0;
-        // The initialization of maria db instance is a little unstable 
according to past CI tests.
-        // Add retry logic here to avoid initialization failure.
-        while (i < INITIALIZE_DB_MAX_RETRY) {
-            try {
-                db = 
DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build());
-                db.start();
-                dbUrl = db.getConfiguration().getURL(DEFAULT_DB_NAME);
-                connection = DriverManager.getConnection(dbUrl);
-                try (Statement statement = connection.createStatement()) {
-                    statement.execute("CREATE DATABASE IF NOT EXISTS `" + 
DEFAULT_DB_NAME + "`;");
-                    ResultSet resultSet =
-                            statement.executeQuery(
-                                    "SELECT SCHEMA_NAME FROM "
-                                            + "INFORMATION_SCHEMA.SCHEMATA 
WHERE SCHEMA_NAME = '"
-                                            + DEFAULT_DB_NAME
-                                            + "';");
-                    if (resultSet.next()) {
-                        String dbName = resultSet.getString(1);
-                        initDbSuccess = 
DEFAULT_DB_NAME.equalsIgnoreCase(dbName);
-                    }
+    private static final String USER = "root";
+    private static final String PASSWORD = "";
+    private static final String COLUMNS =
+            "tiny_c, tiny_un_c, small_c, small_un_c, int_c, int_un_c, big_c, 
big_un_c";
+
+    private static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP =
+            new HashMap<String, String>() {
+                {
+                    put("MYSQL_ROOT_HOST", "%");
                 }
-            } catch (Exception e) {
-                logger.warn("Initialize DB failed.", e);
-                stopDb();
-            }
-            if (initDbSuccess) {
-                break;
-            }
-            i++;
-        }
-        if (!initDbSuccess) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Initialize MySQL database instance failed after 
%d attempts,"
-                                    + " please open an issue.",
-                            INITIALIZE_DB_MAX_RETRY));
-        }
-    }
-
-    @Before
-    public void setUp() throws SQLException, IllegalStateException {
-        // dbUrl: jdbc:mysql://localhost:3306/test
-        createMysqlTable();
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        tEnv = StreamTableEnvironment.create(env);
-        createFlinkTable();
-        prepareData();
-    }
+            };
+
+    private static final Object[] ROW =
+            new Object[] {
+                (byte) 127,
+                (short) 255,
+                (short) 32767,
+                65535,
+                2147483647,
+                4294967295L,
+                9223372036854775807L,
+                new BigDecimal("18446744073709551615")
+            };
+
+    @ClassRule
+    public static final MySQLContainer<?> MYSQL_CONTAINER =
+            new MySQLContainer<>(MYSQL_57_IMAGE)
+                    .withEnv(DEFAULT_CONTAINER_ENV_MAP)
+                    .withUsername(USER)
+                    .withPassword(PASSWORD)
+                    .withDatabaseName(DEFAULT_DB_NAME)
+                    .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+
+    private Connection connection;
+    private StreamTableEnvironment tEnv;
 
     @Test
     public void testUnsignedType() throws Exception {
+        prepare();
+
         // write data to db
-        tEnv.executeSql(
-                        "insert into jdbc_sink select"
-                                + " tiny_c,"
-                                + " tiny_un_c,"
-                                + " small_c,"
-                                + " small_un_c ,"
-                                + " int_c,"
-                                + " int_un_c,"
-                                + " big_c ,"
-                                + " big_un_c from data")
+        tEnv.executeSql(String.format("insert into jdbc_sink select %s from 
data", COLUMNS))
                 .await();
 
         // read data from db using jdbc connection and compare
         PreparedStatement query =
                 connection.prepareStatement(
-                        String.format(
-                                "select tiny_c, tiny_un_c, small_c, 
small_un_c,"
-                                        + " int_c, int_un_c, big_c, big_un_c 
from %s",
-                                TABLE_NAME));
+                        String.format("select %s from %s", COLUMNS, 
TABLE_NAME));
         ResultSet resultSet = query.executeQuery();
         while (resultSet.next()) {
-            assertEquals(Integer.valueOf(127), resultSet.getObject("tiny_c"));
-            assertEquals(Integer.valueOf(255), 
resultSet.getObject("tiny_un_c"));
-            assertEquals(Integer.valueOf(32767), 
resultSet.getObject("small_c"));
-            assertEquals(Integer.valueOf(65535), 
resultSet.getObject("small_un_c"));
-            assertEquals(Integer.valueOf(2147483647), 
resultSet.getObject("int_c"));
-            assertEquals(Long.valueOf(4294967295L), 
resultSet.getObject("int_un_c"));
-            assertEquals(Long.valueOf(9223372036854775807L), 
resultSet.getObject("big_c"));
-            assertEquals(new BigInteger("18446744073709551615"), 
resultSet.getObject("big_un_c"));
+            
Assertions.assertThat(resultSet.getObject("tiny_c")).isEqualTo(127);

Review comment:
       Please remove the prepending `Assertions.`

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -43,134 +43,104 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.table.api.Expressions.row;
-import static org.junit.Assert.assertEquals;
+import java.util.Map;
 
 /**
- * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use
- * MariaDB to mock a DB which use mysql driver too.
+ * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use MySQL
+ * to mock a DB.
  */
 public class UnsignedTypeConversionITCase extends AbstractTestBase {
 
-    private static final Logger logger =
+    private static final Logger LOGGER =
             LoggerFactory.getLogger(UnsignedTypeConversionITCase.class);
+
+    private static final DockerImageName MYSQL_57_IMAGE = 
DockerImageName.parse("mysql:5.7.34");
     private static final String DEFAULT_DB_NAME = "test";
     private static final String TABLE_NAME = "unsigned_test";
-    private static final int INITIALIZE_DB_MAX_RETRY = 3;
-    private static DB db;
-    private static String dbUrl;
-    private static Connection connection;
-
-    private StreamTableEnvironment tEnv;
-
-    @BeforeClass
-    public static void prepareMariaDB() throws IllegalStateException {
-        boolean initDbSuccess = false;
-        int i = 0;
-        // The initialization of maria db instance is a little unstable 
according to past CI tests.
-        // Add retry logic here to avoid initialization failure.
-        while (i < INITIALIZE_DB_MAX_RETRY) {
-            try {
-                db = 
DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build());
-                db.start();
-                dbUrl = db.getConfiguration().getURL(DEFAULT_DB_NAME);
-                connection = DriverManager.getConnection(dbUrl);
-                try (Statement statement = connection.createStatement()) {
-                    statement.execute("CREATE DATABASE IF NOT EXISTS `" + 
DEFAULT_DB_NAME + "`;");
-                    ResultSet resultSet =
-                            statement.executeQuery(
-                                    "SELECT SCHEMA_NAME FROM "
-                                            + "INFORMATION_SCHEMA.SCHEMATA 
WHERE SCHEMA_NAME = '"
-                                            + DEFAULT_DB_NAME
-                                            + "';");
-                    if (resultSet.next()) {
-                        String dbName = resultSet.getString(1);
-                        initDbSuccess = 
DEFAULT_DB_NAME.equalsIgnoreCase(dbName);
-                    }
+    private static final String USER = "root";
+    private static final String PASSWORD = "";
+    private static final String COLUMNS =
+            "tiny_c, tiny_un_c, small_c, small_un_c, int_c, int_un_c, big_c, 
big_un_c";
+
+    private static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP =
+            new HashMap<String, String>() {
+                {
+                    put("MYSQL_ROOT_HOST", "%");
                 }
-            } catch (Exception e) {
-                logger.warn("Initialize DB failed.", e);
-                stopDb();
-            }
-            if (initDbSuccess) {
-                break;
-            }
-            i++;
-        }
-        if (!initDbSuccess) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Initialize MySQL database instance failed after 
%d attempts,"
-                                    + " please open an issue.",
-                            INITIALIZE_DB_MAX_RETRY));
-        }
-    }
-
-    @Before
-    public void setUp() throws SQLException, IllegalStateException {
-        // dbUrl: jdbc:mysql://localhost:3306/test
-        createMysqlTable();
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        tEnv = StreamTableEnvironment.create(env);
-        createFlinkTable();
-        prepareData();
-    }
+            };
+
+    private static final Object[] ROW =
+            new Object[] {
+                (byte) 127,
+                (short) 255,
+                (short) 32767,
+                65535,
+                2147483647,
+                4294967295L,
+                9223372036854775807L,
+                new BigDecimal("18446744073709551615")
+            };
+
+    @ClassRule
+    public static final MySQLContainer<?> MYSQL_CONTAINER =
+            new MySQLContainer<>(MYSQL_57_IMAGE)
+                    .withEnv(DEFAULT_CONTAINER_ENV_MAP)
+                    .withUsername(USER)
+                    .withPassword(PASSWORD)
+                    .withDatabaseName(DEFAULT_DB_NAME)
+                    .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+
+    private Connection connection;
+    private StreamTableEnvironment tEnv;
 
     @Test
     public void testUnsignedType() throws Exception {
+        prepare();
+
         // write data to db
-        tEnv.executeSql(
-                        "insert into jdbc_sink select"
-                                + " tiny_c,"
-                                + " tiny_un_c,"
-                                + " small_c,"
-                                + " small_un_c ,"
-                                + " int_c,"
-                                + " int_un_c,"
-                                + " big_c ,"
-                                + " big_un_c from data")
+        tEnv.executeSql(String.format("insert into jdbc_sink select %s from 
data", COLUMNS))
                 .await();
 
         // read data from db using jdbc connection and compare
         PreparedStatement query =
                 connection.prepareStatement(
-                        String.format(
-                                "select tiny_c, tiny_un_c, small_c, 
small_un_c,"
-                                        + " int_c, int_un_c, big_c, big_un_c 
from %s",
-                                TABLE_NAME));
+                        String.format("select %s from %s", COLUMNS, 
TABLE_NAME));
         ResultSet resultSet = query.executeQuery();
         while (resultSet.next()) {
-            assertEquals(Integer.valueOf(127), resultSet.getObject("tiny_c"));
-            assertEquals(Integer.valueOf(255), 
resultSet.getObject("tiny_un_c"));
-            assertEquals(Integer.valueOf(32767), 
resultSet.getObject("small_c"));
-            assertEquals(Integer.valueOf(65535), 
resultSet.getObject("small_un_c"));
-            assertEquals(Integer.valueOf(2147483647), 
resultSet.getObject("int_c"));
-            assertEquals(Long.valueOf(4294967295L), 
resultSet.getObject("int_un_c"));
-            assertEquals(Long.valueOf(9223372036854775807L), 
resultSet.getObject("big_c"));
-            assertEquals(new BigInteger("18446744073709551615"), 
resultSet.getObject("big_un_c"));
+            
Assertions.assertThat(resultSet.getObject("tiny_c")).isEqualTo(127);
+            
Assertions.assertThat(resultSet.getObject("tiny_un_c")).isEqualTo(255);
+            
Assertions.assertThat(resultSet.getObject("small_c")).isEqualTo(32767);
+            
Assertions.assertThat(resultSet.getObject("small_un_c")).isEqualTo(65535);
+            
Assertions.assertThat(resultSet.getObject("int_c")).isEqualTo(2147483647);
+            
Assertions.assertThat(resultSet.getObject("int_un_c")).isEqualTo(4294967295L);
+            
Assertions.assertThat(resultSet.getObject("big_c")).isEqualTo(9223372036854775807L);
+            Assertions.assertThat(resultSet.getObject("big_un_c"))
+                    .isEqualTo(new BigInteger("18446744073709551615"));
         }
 
         // read data from db using flink and compare
         Iterator<Row> collected =
-                tEnv.executeSql(
-                                "select tiny_c, tiny_un_c, small_c, 
small_un_c,"
-                                        + " int_c, int_un_c, big_c, big_un_c 
from jdbc_source")
-                        .collect();
-        List<String> result =
-                CollectionUtil.iteratorToList(collected).stream()
-                        .map(Row::toString)
-                        .sorted()
-                        .collect(Collectors.toList());
-        List<String> expected =
-                Collections.singletonList(
-                        "+I[127, 255, 32767, 65535, 2147483647, 4294967295, 
9223372036854775807, 18446744073709551615]");
-        assertEquals(expected, result);
+                tEnv.executeSql(String.format("select %s from jdbc_source", 
COLUMNS)).collect();
+        List<Row> result = CollectionUtil.iteratorToList(collected);
+        List<Row> expected = 
Collections.singletonList(Row.ofKind(RowKind.INSERT, ROW));
+        Assertions.assertThat(result).isEqualTo(expected);
+
+        connection.close();
+    }
+
+    private void prepare() throws Exception {
+        MYSQL_CONTAINER.start();

Review comment:
       You don't need this, the `@Rule` annotation will do it for you

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -43,134 +43,104 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.table.api.Expressions.row;
-import static org.junit.Assert.assertEquals;
+import java.util.Map;
 
 /**
- * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use
- * MariaDB to mock a DB which use mysql driver too.
+ * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use MySQL
+ * to mock a DB.
  */
 public class UnsignedTypeConversionITCase extends AbstractTestBase {
 
-    private static final Logger logger =
+    private static final Logger LOGGER =
             LoggerFactory.getLogger(UnsignedTypeConversionITCase.class);
+
+    private static final DockerImageName MYSQL_57_IMAGE = 
DockerImageName.parse("mysql:5.7.34");
     private static final String DEFAULT_DB_NAME = "test";
     private static final String TABLE_NAME = "unsigned_test";
-    private static final int INITIALIZE_DB_MAX_RETRY = 3;
-    private static DB db;
-    private static String dbUrl;
-    private static Connection connection;
-
-    private StreamTableEnvironment tEnv;
-
-    @BeforeClass
-    public static void prepareMariaDB() throws IllegalStateException {
-        boolean initDbSuccess = false;
-        int i = 0;
-        // The initialization of maria db instance is a little unstable 
according to past CI tests.
-        // Add retry logic here to avoid initialization failure.
-        while (i < INITIALIZE_DB_MAX_RETRY) {
-            try {
-                db = 
DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build());
-                db.start();
-                dbUrl = db.getConfiguration().getURL(DEFAULT_DB_NAME);
-                connection = DriverManager.getConnection(dbUrl);
-                try (Statement statement = connection.createStatement()) {
-                    statement.execute("CREATE DATABASE IF NOT EXISTS `" + 
DEFAULT_DB_NAME + "`;");
-                    ResultSet resultSet =
-                            statement.executeQuery(
-                                    "SELECT SCHEMA_NAME FROM "
-                                            + "INFORMATION_SCHEMA.SCHEMATA 
WHERE SCHEMA_NAME = '"
-                                            + DEFAULT_DB_NAME
-                                            + "';");
-                    if (resultSet.next()) {
-                        String dbName = resultSet.getString(1);
-                        initDbSuccess = 
DEFAULT_DB_NAME.equalsIgnoreCase(dbName);
-                    }
+    private static final String USER = "root";
+    private static final String PASSWORD = "";
+    private static final String COLUMNS =

Review comment:
       Can you make this variable a list using `Arrays.asList`, for clarity?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -43,134 +43,104 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.table.api.Expressions.row;
-import static org.junit.Assert.assertEquals;
+import java.util.Map;
 
 /**
- * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use
- * MariaDB to mock a DB which use mysql driver too.
+ * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use MySQL
+ * to mock a DB.
  */
 public class UnsignedTypeConversionITCase extends AbstractTestBase {
 
-    private static final Logger logger =
+    private static final Logger LOGGER =
             LoggerFactory.getLogger(UnsignedTypeConversionITCase.class);
+
+    private static final DockerImageName MYSQL_57_IMAGE = 
DockerImageName.parse("mysql:5.7.34");
     private static final String DEFAULT_DB_NAME = "test";
     private static final String TABLE_NAME = "unsigned_test";
-    private static final int INITIALIZE_DB_MAX_RETRY = 3;
-    private static DB db;
-    private static String dbUrl;
-    private static Connection connection;
-
-    private StreamTableEnvironment tEnv;
-
-    @BeforeClass
-    public static void prepareMariaDB() throws IllegalStateException {
-        boolean initDbSuccess = false;
-        int i = 0;
-        // The initialization of maria db instance is a little unstable 
according to past CI tests.
-        // Add retry logic here to avoid initialization failure.
-        while (i < INITIALIZE_DB_MAX_RETRY) {
-            try {
-                db = 
DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build());
-                db.start();
-                dbUrl = db.getConfiguration().getURL(DEFAULT_DB_NAME);
-                connection = DriverManager.getConnection(dbUrl);
-                try (Statement statement = connection.createStatement()) {
-                    statement.execute("CREATE DATABASE IF NOT EXISTS `" + 
DEFAULT_DB_NAME + "`;");
-                    ResultSet resultSet =
-                            statement.executeQuery(
-                                    "SELECT SCHEMA_NAME FROM "
-                                            + "INFORMATION_SCHEMA.SCHEMATA 
WHERE SCHEMA_NAME = '"
-                                            + DEFAULT_DB_NAME
-                                            + "';");
-                    if (resultSet.next()) {
-                        String dbName = resultSet.getString(1);
-                        initDbSuccess = 
DEFAULT_DB_NAME.equalsIgnoreCase(dbName);
-                    }
+    private static final String USER = "root";
+    private static final String PASSWORD = "";
+    private static final String COLUMNS =
+            "tiny_c, tiny_un_c, small_c, small_un_c, int_c, int_un_c, big_c, 
big_un_c";
+
+    private static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP =
+            new HashMap<String, String>() {
+                {
+                    put("MYSQL_ROOT_HOST", "%");
                 }
-            } catch (Exception e) {
-                logger.warn("Initialize DB failed.", e);
-                stopDb();
-            }
-            if (initDbSuccess) {
-                break;
-            }
-            i++;
-        }
-        if (!initDbSuccess) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Initialize MySQL database instance failed after 
%d attempts,"
-                                    + " please open an issue.",
-                            INITIALIZE_DB_MAX_RETRY));
-        }
-    }
-
-    @Before
-    public void setUp() throws SQLException, IllegalStateException {
-        // dbUrl: jdbc:mysql://localhost:3306/test
-        createMysqlTable();
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        tEnv = StreamTableEnvironment.create(env);
-        createFlinkTable();
-        prepareData();
-    }
+            };
+
+    private static final Object[] ROW =
+            new Object[] {
+                (byte) 127,
+                (short) 255,
+                (short) 32767,
+                65535,
+                2147483647,
+                4294967295L,
+                9223372036854775807L,
+                new BigDecimal("18446744073709551615")
+            };
+
+    @ClassRule
+    public static final MySQLContainer<?> MYSQL_CONTAINER =
+            new MySQLContainer<>(MYSQL_57_IMAGE)
+                    .withEnv(DEFAULT_CONTAINER_ENV_MAP)
+                    .withUsername(USER)
+                    .withPassword(PASSWORD)
+                    .withDatabaseName(DEFAULT_DB_NAME)
+                    .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+
+    private Connection connection;
+    private StreamTableEnvironment tEnv;
 
     @Test
     public void testUnsignedType() throws Exception {
+        prepare();
+
         // write data to db
-        tEnv.executeSql(
-                        "insert into jdbc_sink select"
-                                + " tiny_c,"
-                                + " tiny_un_c,"
-                                + " small_c,"
-                                + " small_un_c ,"
-                                + " int_c,"
-                                + " int_un_c,"
-                                + " big_c ,"
-                                + " big_un_c from data")
+        tEnv.executeSql(String.format("insert into jdbc_sink select %s from 
data", COLUMNS))
                 .await();
 
         // read data from db using jdbc connection and compare
         PreparedStatement query =
                 connection.prepareStatement(
-                        String.format(
-                                "select tiny_c, tiny_un_c, small_c, 
small_un_c,"
-                                        + " int_c, int_un_c, big_c, big_un_c 
from %s",
-                                TABLE_NAME));
+                        String.format("select %s from %s", COLUMNS, 
TABLE_NAME));
         ResultSet resultSet = query.executeQuery();
         while (resultSet.next()) {
-            assertEquals(Integer.valueOf(127), resultSet.getObject("tiny_c"));
-            assertEquals(Integer.valueOf(255), 
resultSet.getObject("tiny_un_c"));
-            assertEquals(Integer.valueOf(32767), 
resultSet.getObject("small_c"));
-            assertEquals(Integer.valueOf(65535), 
resultSet.getObject("small_un_c"));
-            assertEquals(Integer.valueOf(2147483647), 
resultSet.getObject("int_c"));
-            assertEquals(Long.valueOf(4294967295L), 
resultSet.getObject("int_un_c"));
-            assertEquals(Long.valueOf(9223372036854775807L), 
resultSet.getObject("big_c"));
-            assertEquals(new BigInteger("18446744073709551615"), 
resultSet.getObject("big_un_c"));
+            
Assertions.assertThat(resultSet.getObject("tiny_c")).isEqualTo(127);
+            
Assertions.assertThat(resultSet.getObject("tiny_un_c")).isEqualTo(255);
+            
Assertions.assertThat(resultSet.getObject("small_c")).isEqualTo(32767);
+            
Assertions.assertThat(resultSet.getObject("small_un_c")).isEqualTo(65535);
+            
Assertions.assertThat(resultSet.getObject("int_c")).isEqualTo(2147483647);
+            
Assertions.assertThat(resultSet.getObject("int_un_c")).isEqualTo(4294967295L);
+            
Assertions.assertThat(resultSet.getObject("big_c")).isEqualTo(9223372036854775807L);
+            Assertions.assertThat(resultSet.getObject("big_un_c"))
+                    .isEqualTo(new BigInteger("18446744073709551615"));
         }
 
         // read data from db using flink and compare
         Iterator<Row> collected =
-                tEnv.executeSql(
-                                "select tiny_c, tiny_un_c, small_c, 
small_un_c,"
-                                        + " int_c, int_un_c, big_c, big_un_c 
from jdbc_source")
-                        .collect();
-        List<String> result =
-                CollectionUtil.iteratorToList(collected).stream()
-                        .map(Row::toString)
-                        .sorted()
-                        .collect(Collectors.toList());
-        List<String> expected =
-                Collections.singletonList(
-                        "+I[127, 255, 32767, 65535, 2147483647, 4294967295, 
9223372036854775807, 18446744073709551615]");
-        assertEquals(expected, result);
+                tEnv.executeSql(String.format("select %s from jdbc_source", 
COLUMNS)).collect();
+        List<Row> result = CollectionUtil.iteratorToList(collected);
+        List<Row> expected = 
Collections.singletonList(Row.ofKind(RowKind.INSERT, ROW));
+        Assertions.assertThat(result).isEqualTo(expected);

Review comment:
       rather than `isEqualTo` here you can use `.containsOnly`

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -43,134 +43,104 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.table.api.Expressions.row;
-import static org.junit.Assert.assertEquals;
+import java.util.Map;
 
 /**
- * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use
- * MariaDB to mock a DB which use mysql driver too.
+ * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use MySQL
+ * to mock a DB.
  */
 public class UnsignedTypeConversionITCase extends AbstractTestBase {
 
-    private static final Logger logger =
+    private static final Logger LOGGER =
             LoggerFactory.getLogger(UnsignedTypeConversionITCase.class);
+
+    private static final DockerImageName MYSQL_57_IMAGE = 
DockerImageName.parse("mysql:5.7.34");
     private static final String DEFAULT_DB_NAME = "test";
     private static final String TABLE_NAME = "unsigned_test";
-    private static final int INITIALIZE_DB_MAX_RETRY = 3;
-    private static DB db;
-    private static String dbUrl;
-    private static Connection connection;
-
-    private StreamTableEnvironment tEnv;
-
-    @BeforeClass
-    public static void prepareMariaDB() throws IllegalStateException {
-        boolean initDbSuccess = false;
-        int i = 0;
-        // The initialization of maria db instance is a little unstable 
according to past CI tests.
-        // Add retry logic here to avoid initialization failure.
-        while (i < INITIALIZE_DB_MAX_RETRY) {
-            try {
-                db = 
DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build());
-                db.start();
-                dbUrl = db.getConfiguration().getURL(DEFAULT_DB_NAME);
-                connection = DriverManager.getConnection(dbUrl);
-                try (Statement statement = connection.createStatement()) {
-                    statement.execute("CREATE DATABASE IF NOT EXISTS `" + 
DEFAULT_DB_NAME + "`;");
-                    ResultSet resultSet =
-                            statement.executeQuery(
-                                    "SELECT SCHEMA_NAME FROM "
-                                            + "INFORMATION_SCHEMA.SCHEMATA 
WHERE SCHEMA_NAME = '"
-                                            + DEFAULT_DB_NAME
-                                            + "';");
-                    if (resultSet.next()) {
-                        String dbName = resultSet.getString(1);
-                        initDbSuccess = 
DEFAULT_DB_NAME.equalsIgnoreCase(dbName);
-                    }
+    private static final String USER = "root";
+    private static final String PASSWORD = "";
+    private static final String COLUMNS =
+            "tiny_c, tiny_un_c, small_c, small_un_c, int_c, int_un_c, big_c, 
big_un_c";
+
+    private static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP =
+            new HashMap<String, String>() {
+                {
+                    put("MYSQL_ROOT_HOST", "%");
                 }
-            } catch (Exception e) {
-                logger.warn("Initialize DB failed.", e);
-                stopDb();
-            }
-            if (initDbSuccess) {
-                break;
-            }
-            i++;
-        }
-        if (!initDbSuccess) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Initialize MySQL database instance failed after 
%d attempts,"
-                                    + " please open an issue.",
-                            INITIALIZE_DB_MAX_RETRY));
-        }
-    }
-
-    @Before
-    public void setUp() throws SQLException, IllegalStateException {
-        // dbUrl: jdbc:mysql://localhost:3306/test
-        createMysqlTable();
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        tEnv = StreamTableEnvironment.create(env);
-        createFlinkTable();
-        prepareData();
-    }
+            };
+
+    private static final Object[] ROW =
+            new Object[] {
+                (byte) 127,
+                (short) 255,
+                (short) 32767,
+                65535,
+                2147483647,
+                4294967295L,
+                9223372036854775807L,
+                new BigDecimal("18446744073709551615")
+            };
+
+    @ClassRule
+    public static final MySQLContainer<?> MYSQL_CONTAINER =
+            new MySQLContainer<>(MYSQL_57_IMAGE)
+                    .withEnv(DEFAULT_CONTAINER_ENV_MAP)
+                    .withUsername(USER)
+                    .withPassword(PASSWORD)
+                    .withDatabaseName(DEFAULT_DB_NAME)
+                    .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+
+    private Connection connection;
+    private StreamTableEnvironment tEnv;
 
     @Test
     public void testUnsignedType() throws Exception {
+        prepare();
+
         // write data to db
-        tEnv.executeSql(
-                        "insert into jdbc_sink select"
-                                + " tiny_c,"
-                                + " tiny_un_c,"
-                                + " small_c,"
-                                + " small_un_c ,"
-                                + " int_c,"
-                                + " int_un_c,"
-                                + " big_c ,"
-                                + " big_un_c from data")
+        tEnv.executeSql(String.format("insert into jdbc_sink select %s from 
data", COLUMNS))
                 .await();
 
         // read data from db using jdbc connection and compare
         PreparedStatement query =
                 connection.prepareStatement(
-                        String.format(
-                                "select tiny_c, tiny_un_c, small_c, 
small_un_c,"
-                                        + " int_c, int_un_c, big_c, big_un_c 
from %s",
-                                TABLE_NAME));
+                        String.format("select %s from %s", COLUMNS, 
TABLE_NAME));
         ResultSet resultSet = query.executeQuery();
         while (resultSet.next()) {
-            assertEquals(Integer.valueOf(127), resultSet.getObject("tiny_c"));
-            assertEquals(Integer.valueOf(255), 
resultSet.getObject("tiny_un_c"));
-            assertEquals(Integer.valueOf(32767), 
resultSet.getObject("small_c"));
-            assertEquals(Integer.valueOf(65535), 
resultSet.getObject("small_un_c"));
-            assertEquals(Integer.valueOf(2147483647), 
resultSet.getObject("int_c"));
-            assertEquals(Long.valueOf(4294967295L), 
resultSet.getObject("int_un_c"));
-            assertEquals(Long.valueOf(9223372036854775807L), 
resultSet.getObject("big_c"));
-            assertEquals(new BigInteger("18446744073709551615"), 
resultSet.getObject("big_un_c"));
+            
Assertions.assertThat(resultSet.getObject("tiny_c")).isEqualTo(127);
+            
Assertions.assertThat(resultSet.getObject("tiny_un_c")).isEqualTo(255);
+            
Assertions.assertThat(resultSet.getObject("small_c")).isEqualTo(32767);
+            
Assertions.assertThat(resultSet.getObject("small_un_c")).isEqualTo(65535);
+            
Assertions.assertThat(resultSet.getObject("int_c")).isEqualTo(2147483647);
+            
Assertions.assertThat(resultSet.getObject("int_un_c")).isEqualTo(4294967295L);
+            
Assertions.assertThat(resultSet.getObject("big_c")).isEqualTo(9223372036854775807L);
+            Assertions.assertThat(resultSet.getObject("big_un_c"))
+                    .isEqualTo(new BigInteger("18446744073709551615"));
         }
 
         // read data from db using flink and compare
         Iterator<Row> collected =
-                tEnv.executeSql(
-                                "select tiny_c, tiny_un_c, small_c, 
small_un_c,"
-                                        + " int_c, int_un_c, big_c, big_un_c 
from jdbc_source")
-                        .collect();
-        List<String> result =
-                CollectionUtil.iteratorToList(collected).stream()
-                        .map(Row::toString)
-                        .sorted()
-                        .collect(Collectors.toList());
-        List<String> expected =
-                Collections.singletonList(
-                        "+I[127, 255, 32767, 65535, 2147483647, 4294967295, 
9223372036854775807, 18446744073709551615]");
-        assertEquals(expected, result);
+                tEnv.executeSql(String.format("select %s from jdbc_source", 
COLUMNS)).collect();
+        List<Row> result = CollectionUtil.iteratorToList(collected);
+        List<Row> expected = 
Collections.singletonList(Row.ofKind(RowKind.INSERT, ROW));
+        Assertions.assertThat(result).isEqualTo(expected);
+
+        connection.close();

Review comment:
       Once you make connection a local variable, use the try-with-resources 
statement




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to