loserwang1024 commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1610861124
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java:
##########
@@ -217,6 +220,11 @@ public JdbcSourceFetchTaskContext
createFetchTaskContext(JdbcSourceConfig taskSo
@Override
public void notifyCheckpointComplete(long checkpointId, Offset offset)
throws Exception {
Review Comment:
What about do it in
IncrementalSourceReaderWithCommit#notifyCheckpointComplete and
PostgresSourceReader#notifyCheckpointComplete. Reader control when and whether
to commit offset, while dialect just support ability to do it.
And when put into reader, can just use a long rather than AtomicLong. so we
can set checkpointCount = (checkpointCount+1)%3
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java:
##########
@@ -100,7 +102,9 @@ public PostgresSourceConfig create(int subtaskId) {
// The PostgresSource will do snapshot according to its StartupMode.
// Do not need debezium to do the snapshot work.
- props.put("snapshot.mode", "never");
+ props.setProperty("snapshot.mode", "never");
+
+ props.setProperty("checkpoint.cycle", String.valueOf(checkpointCycle));
Review Comment:
I don't know what "checkpoint.cycle" does? Debezium's offet commit cycle?
Flink cdc have already been responsible for submitting offset, please not let
Debezium do it again(turn off)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]