Yizhou-Yang opened a new issue, #8817:
URL: https://github.com/apache/inlong/issues/8817

   ### Description
   
   1.Background:
   
   It is known that the current InLong connector source only implements a MySQL 
split fetcher, and the VerVerica source has only open-sourced a MySQL split 
fetcher, so the related functionality needs to be self-developed based on the 
reference of MySQL. The MySQL implementation is based on binlog, and now the 
relevant functions need to be synchronized to the Postgres source, so a method 
to obtain Debezium split without using binlog needs to be developed.
   
   The current MySQL split fetcher is divided into two types: MySQLBinlogSplit 
and MySQLSnapshotSplit. Postgresql only needs to implement a snapshot split 
fetcher. (Schema and schemaless)
   
   Postgresql Debezium CDC uses LSN to benchmark MySQL binlog, so in the 
implementation, the data modification position can be tracked through the 
encapsulated PostgresOffset class.
   
   This requirement is expected to take 20-30 person-days, and it needs to be 
implemented within 15 days due to time urgency.
   
   2. Solution: The relevant parameters of the control layer have been 
implemented last week, and only the Oceanus connector needs to be implemented 
now. This solution has three major modules: Postgres state context, Postgres 
LSN split fetcher, and Postgres enumerator.
   
   In Debezium 1.8.0,
   
   The watermark required for implementing the fetcher is currently planned to 
be obtained through io.debezium.engine.format.OffsetContext. public 
PostgresOffset getWatermark(OffsetContext offsetContext, String columns) { 
Map<String, ?> sourceOffset = offsetContext.offset().getSourceOffset(); 
PostgresOffset highWatermark = sourceOffset.get(column + ".max"); //do the 
nessesary casting and initialization return highwatermark }
   
   The above code is not available and needs to be copied and fine-tuned from 
the SourceEventEmitter to generate a high watermark.
   
   Postgres stateful context needs to be implemented to save and read status. 
Currently, the state cannot be converted into a context for sharing. There is a 
PostgresSourceFetchTaskContext, but the state saving is not complete enough, 
and it needs to be fine-tuned.
   
   The columns are a unique identifier, which is passed down by the upstream 
query primary key SQL result or null.
   
   When creating an event filter, a PostgresOffset implementation similar to 
BinlogOffset.gettimestamp.builder is required. Here, a class similar to the one 
that can filter heartbeat by timestamp needs to be implemented using pg's Lsn. 
lsn.getStringRepresentation should be similar to mysql.binlogoffset.tostring in 
usage.
   
   There is a difficulty here: DataInputDeserializer in cannot obtain the LSN 
sequence number from it.
   
   3.Self-test verification: 
   
   Run two pg-pg upsert tasks, one with schema and one schemaless, both with 
multiple splits (100w data). Print the current split and the current processed 
data volume at the end of each split.
   
   Pause the task, continue from the timestamp checkpoint, and then verify that 
the split number is consistent with the timestamp.
   
   
   ### Use case
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes, I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@inlong.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to