yuxiqian commented on code in PR #3432:
URL: https://github.com/apache/flink-cdc/pull/3432#discussion_r1650472226
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java:
##########
@@ -324,11 +325,17 @@ private static long getBinlogTimestamp(BinaryLogClient
client, String binlogFile
client.setBinlogFilename(binlogFile);
client.setBinlogPosition(0);
- LOG.info("begin parse binlog: {}", binlogFile);
+ LOG.info("Begin parse binlog: {}", binlogFile);
client.connect();
} finally {
client.unregisterEventListener(eventListener);
}
- return binlogTimestamps.take();
+
+ Long timestamp = binlogTimestamps.poll(5, TimeUnit.SECONDS);
+ if (timestamp == null) {
+ timestamp = 0L;
Review Comment:
Is `0L` a safe fallback value here? Seems the return value of
`getBinlogTimestamp` will be used for binary searching in `searchBinlogName`.
If midTs is zero, `midTs < targetMs` will always be true, and the earlier half
binlog will be dropped, which might not be safe since we may miss some binlogs
in given range.
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java:
##########
Review Comment:
It would be nice if we can mock a `BinaryLogClient` and add a timeout test
to verify this change.
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java:
##########
@@ -324,11 +325,17 @@ private static long getBinlogTimestamp(BinaryLogClient
client, String binlogFile
client.setBinlogFilename(binlogFile);
client.setBinlogPosition(0);
- LOG.info("begin parse binlog: {}", binlogFile);
+ LOG.info("Begin parse binlog: {}", binlogFile);
client.connect();
} finally {
client.unregisterEventListener(eventListener);
}
- return binlogTimestamps.take();
+
+ Long timestamp = binlogTimestamps.poll(5, TimeUnit.SECONDS);
+ if (timestamp == null) {
+ timestamp = 0L;
+ LOG.info("Failed to get binlog file {} 's timestamp within 5
seconds", binlogFile);
Review Comment:
`info`-level seems a bit too weak. Could be a `warning`?
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java:
##########
@@ -324,11 +325,17 @@ private static long getBinlogTimestamp(BinaryLogClient
client, String binlogFile
client.setBinlogFilename(binlogFile);
client.setBinlogPosition(0);
- LOG.info("begin parse binlog: {}", binlogFile);
+ LOG.info("Begin parse binlog: {}", binlogFile);
client.connect();
} finally {
client.unregisterEventListener(eventListener);
}
- return binlogTimestamps.take();
+
+ Long timestamp = binlogTimestamps.poll(5, TimeUnit.SECONDS);
Review Comment:
Extract this as a named constant?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]