yuxiqian commented on code in PR #3955:
URL: https://github.com/apache/flink-cdc/pull/3955#discussion_r2001368371
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java:
##########
@@ -118,33 +121,57 @@ public static MySqlDatabaseSchema
createMySqlDatabaseSchema(
/** Fetch current binlog offsets in MySql Server. */
public static BinlogOffset currentBinlogOffset(JdbcConnection jdbc) {
- final String showMasterStmt = "SHOW MASTER STATUS";
try {
- return jdbc.queryAndMap(
- showMasterStmt,
+ return queryBinlogStatus(
+ jdbc,
rs -> {
- if (rs.next()) {
- final String binlogFilename = rs.getString(1);
- final long binlogPosition = rs.getLong(2);
- final String gtidSet =
- rs.getMetaData().getColumnCount() > 4 ?
rs.getString(5) : null;
- return BinlogOffset.builder()
- .setBinlogFilePosition(binlogFilename,
binlogPosition)
- .setGtidSet(gtidSet)
- .build();
- } else {
- throw new FlinkRuntimeException(
- "Cannot read the binlog filename and
position via '"
- + showMasterStmt
- + "'. Make sure your server is
correctly configured");
+ try {
+ if (rs.next()) {
+ final String binlogFilename = rs.getString(1);
+ final long binlogPosition = rs.getLong(2);
+ final String gtidSet =
+ rs.getMetaData().getColumnCount() > 4
+ ? rs.getString(5)
+ : null;
+ return BinlogOffset.builder()
+ .setBinlogFilePosition(binlogFilename,
binlogPosition)
+ .setGtidSet(gtidSet)
+ .build();
+ } else {
+ throw new ConfigurationException(
+ "Cannot read the binlog filename and
position. Make sure your server is correctly configured");
+ }
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e);
}
});
} catch (SQLException e) {
- throw new FlinkRuntimeException(
- "Cannot read the binlog filename and position via '"
- + showMasterStmt
- + "'. Make sure your server is correctly
configured",
- e);
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ public static <T> T queryBinlogStatus(
+ JdbcConnection connection, Function<ResultSet, T> callback) throws
SQLException {
+ String showMasterStmt = "SHOW MASTER STATUS";
Review Comment:
Could we detect the MySQL version just once, and avoid the trial-and-error
check every time?
##########
docs/content.zh/docs/faq/faq.md:
##########
Review Comment:
MySQL 8.4+ support would be great, but validation is still required. Shall
we parameterize MySQL UT & E2e test cases with multiple LTS versions (including
5.7, 8.0, 8.4), just like MongoDB CDC?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]