Hi Alexander, Thanks for starting this discussion.
Your analysis is right, currently kafka integration is having such inconsistency issues. You can share your thoughts on this. I'll share my thoughts here: The external log system makes a lot of sense, but currently there are some usability issues. The kafka transaction confuses me. If the kafka transaction is turned on, then why not just read the incremental data from the FileStore, there is not much difference in their latency. So my idea is external log system as the binlog storage of the store, its data should be written into by the store, and it can make the data visible in advance to reduce the latency, so that paimon really provides millisecond-level l-stream read latency. There is still a lot of design and advancement missing here. Best, Jingsong On Tue, Mar 28, 2023 at 10:44 AM Alexander Sorokoumov <[email protected]> wrote: > > Hi Paimon community, > > Congratulations on starting the incubating project! > > I have a question about preserving the write order and visibility between > LogStore and FileStore when LogStore is configured as an external system, > e.g., Kafka. To my knowledge, Paimon does not coordinate writes to LogStore > and FileStore between multiple Flink jobs writing to the same table. The > only check I am aware of is Snapshot Isolation key range check for > concurrent snapshot commits. > > If the assumptions above are correct, then the record order in Kafka does > not really correspond to the order in FileStore’s snapshots. In addition, > Kafka topics have different visibility guarantees under EXACTLY_ONCE, > compared to FileStore. As a motivating example, consider the following > setup: > > 1. Paimon table configured with Kafka as an external log system. The > change log topic t1 has 1 partition. > 2. There are 2 concurrent Flink jobs writing to the same table > non-intersecting key ranges. Job1 writes keys {x1-x18}, Job2 writes keys > {y1-y4}. > 3. Both jobs write with EXACTLY_ONCE delivery guarantee, so there are 2 > Kafka transactions - txn1 for Job1 and txn2 for Job2. > 4. The final records order in t1 is [x1, x2, x3, x4, y1, y2, x5, x6, x7, > y3, y4, x8]. > 5. Snapshot1 of Job1 is concurrent to Snapshot2 of Job2. Snapshot1 will > commit first, Snapshot2 will commit second. > 6. See illustration for clarifications - > > https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/f6b763c70e0ddbfaabab5806ceaf6cb8c13c1fcd/concurrent-writers.png > . > > > In this setup, the following anomalies are possible: > > 1. Consider a continuous streaming query submitted after Snapshot 1 > committed, before Snapshot 2 committed. Such a query returns Snapshot 1 > and then streams newer records. The snapshot will contain records {x1-x8} > and won’t contain {y1-y2}. The streaming results would tail records after > the last covered offset, so after 12 . As a result, such query won’t ever > return records from txn2. > 2. Consider a batch query submitted after Snapshot 1 is committed, > before Snapshot 2 is committed. Such a query would return {x1-x8}. However, > a following streaming query reading from offset 1 would not return any > results until Snapshot 2 commits. This would happen because the streaming > query would read from the Kafka topic under READ_COMMITTED. As txn1 and > txn2 overlap, txn1 records are not visible until txn2 commits. > > > > I think I have some ideas about this, but I'm worried the scope might be > too large, and I'm not sure how important external log systems are to the > Paimon roadmap. > > Best, > Alexander
