Hi Alexander and Jingsong Sorry for joining the discussion late. I think @Alexander raise an important question: the data consistency between log system and file store in Paimon. In fact it also confuses me.
In addition to multiple jobs writing to the same table, even when a job writes to a table, there will be different data in log system and file store after the job failover. As @Jingsong mentioned above, `external log system as the binlog storage of the store`, but the file store won't recover from log system. It recovers from `checkpoint` in upstream streaming process system which will cause the data gap between log system and file store. So my question is: do we really need a log system? If so, is it necessary to ensure data consistency between log system and file store? For example, rollback the uncommit data of log system according to the file store? If users don't care the `wrong data` in log system, should Paimon support users reading 'uncommitted' data in the future to support millisecond level delays without log system? Hope to hear from you, thanks Best, Shammon FY On Tue, Mar 28, 2023 at 3:27 PM Jingsong Li <[email protected]> wrote: > Hi Alexander, > > Thanks for starting this discussion. > > Your analysis is right, currently kafka integration is having such > inconsistency issues. > You can share your thoughts on this. > > I'll share my thoughts here: > > The external log system makes a lot of sense, but currently there are > some usability issues. > > The kafka transaction confuses me. If the kafka transaction is turned > on, then why not just read the incremental data from the FileStore, > there is not much difference in their latency. > > So my idea is external log system as the binlog storage of the store, > its data should be written into by the store, and it can make the data > visible in advance to reduce the latency, so that paimon really > provides millisecond-level l-stream read latency. > There is still a lot of design and advancement missing here. > > Best, > Jingsong > > On Tue, Mar 28, 2023 at 10:44 AM Alexander Sorokoumov > <[email protected]> wrote: > > > > Hi Paimon community, > > > > Congratulations on starting the incubating project! > > > > I have a question about preserving the write order and visibility between > > LogStore and FileStore when LogStore is configured as an external system, > > e.g., Kafka. To my knowledge, Paimon does not coordinate writes to > LogStore > > and FileStore between multiple Flink jobs writing to the same table. The > > only check I am aware of is Snapshot Isolation key range check for > > concurrent snapshot commits. > > > > If the assumptions above are correct, then the record order in Kafka does > > not really correspond to the order in FileStore’s snapshots. In addition, > > Kafka topics have different visibility guarantees under EXACTLY_ONCE, > > compared to FileStore. As a motivating example, consider the following > > setup: > > > > 1. Paimon table configured with Kafka as an external log system. The > > change log topic t1 has 1 partition. > > 2. There are 2 concurrent Flink jobs writing to the same table > > non-intersecting key ranges. Job1 writes keys {x1-x18}, Job2 writes > keys > > {y1-y4}. > > 3. Both jobs write with EXACTLY_ONCE delivery guarantee, so there are > 2 > > Kafka transactions - txn1 for Job1 and txn2 for Job2. > > 4. The final records order in t1 is [x1, x2, x3, x4, y1, y2, x5, x6, > x7, > > y3, y4, x8]. > > 5. Snapshot1 of Job1 is concurrent to Snapshot2 of Job2. Snapshot1 > will > > commit first, Snapshot2 will commit second. > > 6. See illustration for clarifications - > > > https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/f6b763c70e0ddbfaabab5806ceaf6cb8c13c1fcd/concurrent-writers.png > > . > > > > > > In this setup, the following anomalies are possible: > > > > 1. Consider a continuous streaming query submitted after Snapshot 1 > > committed, before Snapshot 2 committed. Such a query returns > Snapshot 1 > > and then streams newer records. The snapshot will contain records > {x1-x8} > > and won’t contain {y1-y2}. The streaming results would tail records > after > > the last covered offset, so after 12 . As a result, such query won’t > ever > > return records from txn2. > > 2. Consider a batch query submitted after Snapshot 1 is committed, > > before Snapshot 2 is committed. Such a query would return {x1-x8}. > However, > > a following streaming query reading from offset 1 would not return any > > results until Snapshot 2 commits. This would happen because the > streaming > > query would read from the Kafka topic under READ_COMMITTED. As txn1 > and > > txn2 overlap, txn1 records are not visible until txn2 commits. > > > > > > > > I think I have some ideas about this, but I'm worried the scope might be > > too large, and I'm not sure how important external log systems are to the > > Paimon roadmap. > > > > Best, > > Alexander >
