wolfboys commented on code in PR #3955:
URL: https://github.com/apache/flink-cdc/pull/3955#discussion_r2001483266


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java:
##########
@@ -118,33 +121,57 @@ public static MySqlDatabaseSchema 
createMySqlDatabaseSchema(
 
     /** Fetch current binlog offsets in MySql Server. */
     public static BinlogOffset currentBinlogOffset(JdbcConnection jdbc) {
-        final String showMasterStmt = "SHOW MASTER STATUS";
         try {
-            return jdbc.queryAndMap(
-                    showMasterStmt,
+            return queryBinlogStatus(
+                    jdbc,
                     rs -> {
-                        if (rs.next()) {
-                            final String binlogFilename = rs.getString(1);
-                            final long binlogPosition = rs.getLong(2);
-                            final String gtidSet =
-                                    rs.getMetaData().getColumnCount() > 4 ? 
rs.getString(5) : null;
-                            return BinlogOffset.builder()
-                                    .setBinlogFilePosition(binlogFilename, 
binlogPosition)
-                                    .setGtidSet(gtidSet)
-                                    .build();
-                        } else {
-                            throw new FlinkRuntimeException(
-                                    "Cannot read the binlog filename and 
position via '"
-                                            + showMasterStmt
-                                            + "'. Make sure your server is 
correctly configured");
+                        try {
+                            if (rs.next()) {
+                                final String binlogFilename = rs.getString(1);
+                                final long binlogPosition = rs.getLong(2);
+                                final String gtidSet =
+                                        rs.getMetaData().getColumnCount() > 4
+                                                ? rs.getString(5)
+                                                : null;
+                                return BinlogOffset.builder()
+                                        .setBinlogFilePosition(binlogFilename, 
binlogPosition)
+                                        .setGtidSet(gtidSet)
+                                        .build();
+                            } else {
+                                throw new ConfigurationException(
+                                        "Cannot read the binlog filename and 
position. Make sure your server is correctly configured");
+                            }
+                        } catch (Exception e) {
+                            throw new FlinkRuntimeException(e);
                         }
                     });
         } catch (SQLException e) {
-            throw new FlinkRuntimeException(
-                    "Cannot read the binlog filename and position via '"
-                            + showMasterStmt
-                            + "'. Make sure your server is correctly 
configured",
-                    e);
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    public static <T> T queryBinlogStatus(
+            JdbcConnection connection, Function<ResultSet, T> callback) throws 
SQLException {
+        String showMasterStmt = "SHOW MASTER STATUS";

Review Comment:
   Thanks for your review. If this method is called frequently, we should look 
for ways to improve it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to