yuxiqian commented on code in PR #3850:
URL: https://github.com/apache/flink-cdc/pull/3850#discussion_r1912587247
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/internal/DebeziumChangeFetcher.java:
##########
@@ -268,8 +268,9 @@ private void emitRecordsUnderCheckpointLock(
synchronized (checkpointLock) {
T record;
while ((record = records.poll()) != null) {
+ // If the snapshot does not end or no latest data is entered,
-1 is reported
emitDelay =
- isInDbSnapshotPhase ? 0L : System.currentTimeMillis()
- messageTimestamp;
+ isInDbSnapshotPhase ? -1L : System.currentTimeMillis()
- messageTimestamp;
Review Comment:
Maybe we can verify this change by adding a test case similar to
`MySqlSourceITCase#testSourceMetrics`
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/internal/DebeziumChangeFetcher.java:
##########
@@ -248,8 +248,8 @@ private void handleBatch(List<ChangeEvent<SourceRecord,
SourceRecord>> changeEve
LOG.error("Failed to deserialize record {}", record, t);
throw t;
}
-
- if (isInDbSnapshotPhase && !isSnapshotRecord(record)) {
+ // Wait for the end of the snapshot phase for the first data to
trigger subsequent calculations.
+ if (isInDbSnapshotPhase && !isSnapshotRecord(record) &&
messageTimestamp !=0) {
Review Comment:
Please make sure that `mvn spotless:apply` / `mvn spotless:check` have been
executed before committing to resolve any codestyle violations
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]