dyp12 commented on code in PR #9735:
URL: https://github.com/apache/seatunnel/pull/9735#discussion_r2289794700
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java:
##########
@@ -204,6 +229,81 @@ public static BinlogOffset getBinlogPosition(Map<String,
?> offset) {
}
}
+ private class TimestampFilterMySqlStreamingChangeEventSource
+ extends MySqlStreamingChangeEventSource {
+
+ private final Long targetTimestamp;
+ private long logTimestamp;
+ private boolean loggedWaitingMessage;
+ private final long LOG_INTERVAL_MS = 10000;
+
+ public TimestampFilterMySqlStreamingChangeEventSource(
+ MySqlConnectorConfig connectorConfig,
+ MySqlConnection connection,
+ JdbcSourceEventDispatcher<MySqlPartition> dispatcher,
+ ErrorHandler errorHandler,
+ Clock clock,
+ MySqlTaskContext taskContext,
+ MySqlStreamingChangeEventSourceMetrics metrics,
+ Long targetTimestamp) {
+ super(
+ connectorConfig,
+ connection,
+ dispatcher,
+ errorHandler,
+ clock,
+ taskContext,
+ metrics);
+ this.targetTimestamp = targetTimestamp;
+ }
+
+ @Override
+ protected void handleEvent(
+ MySqlPartition partition, MySqlOffsetContext offsetContext,
Event event) {
+ if (event == null) {
+ super.handleEvent(partition, offsetContext, event);
+ return;
+ }
+
+ long eventTs = event.getHeader().getTimestamp();
+ if (eventTs == 0 || targetTimestamp == null || targetTimestamp ==
0) {
+ super.handleEvent(partition, offsetContext, event);
+ return;
+ }
+ boolean shouldSkip = eventTs < targetTimestamp;
+ if (shouldSkip) {
+ if (!loggedWaitingMessage) {
+ log.info(
+ "skip binlog, currentTime:{}, filterTime:{}",
eventTs, targetTimestamp);
+ loggedWaitingMessage = true;
+ logTimestamp = eventTs;
+ }
+ if (eventTs - logTimestamp >= LOG_INTERVAL_MS) {
+ loggedWaitingMessage = false;
+ }
+ updateOffsetPosition(offsetContext, event.getHeader());
+ return;
+ }
+
+ super.handleEvent(partition, offsetContext, event);
+ }
+
+ private void updateOffsetPosition(
Review Comment:
> Could you explain why we must update `MySqlOffsetContext` when skip data?
update MySqlOffsetContext,then checkpoint will save latest offset,when
restore,only begin from save offset,not from earliest. in
MySqlStreamingChangeEventSource.handleEvent also update it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]