Hi Paimon community,

Congratulations on starting the incubating project!

I have a question about preserving the write order and visibility between
LogStore and FileStore when LogStore is configured as an external system,
e.g., Kafka. To my knowledge, Paimon does not coordinate writes to LogStore
and FileStore between multiple Flink jobs writing to the same table. The
only check I am aware of is Snapshot Isolation key range check for
concurrent snapshot commits.

If the assumptions above are correct, then the record order in Kafka does
not really correspond to the order in FileStore’s snapshots. In addition,
Kafka topics have different visibility guarantees under EXACTLY_ONCE,
compared to FileStore. As a motivating example, consider the following
setup:

   1. Paimon table configured with Kafka as an external log system. The
   change log topic t1 has 1 partition.
   2. There are 2 concurrent Flink jobs writing to the same table
   non-intersecting key ranges.  Job1 writes keys {x1-x18}, Job2 writes keys
   {y1-y4}.
   3. Both jobs write with EXACTLY_ONCE delivery guarantee, so there are 2
   Kafka transactions - txn1 for Job1 and txn2 for Job2.
   4. The final records order in t1 is [x1, x2, x3, x4, y1, y2, x5, x6, x7,
   y3, y4, x8].
   5. Snapshot1 of Job1 is concurrent to Snapshot2 of Job2. Snapshot1 will
   commit first, Snapshot2 will commit second.
   6. See illustration for clarifications -
   
https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/f6b763c70e0ddbfaabab5806ceaf6cb8c13c1fcd/concurrent-writers.png
   .


 In this setup, the following anomalies are possible:

   1. Consider a continuous streaming query submitted after Snapshot 1
   committed, before Snapshot 2 committed. Such a query  returns Snapshot 1
   and then streams newer records. The snapshot will contain records {x1-x8}
   and won’t contain {y1-y2}. The streaming results would tail records after
   the last covered offset, so after 12 . As a result, such query won’t ever
   return records from txn2.
   2. Consider a batch query submitted after Snapshot 1 is committed,
   before Snapshot 2 is committed. Such a query would return {x1-x8}. However,
   a following streaming query reading from offset 1 would not return any
   results until Snapshot 2 commits. This would happen because the streaming
   query would read from the Kafka topic under READ_COMMITTED. As txn1 and
   txn2 overlap, txn1 records are not visible until txn2 commits.



I think I have some ideas about this, but I'm worried the scope might be
too large, and I'm not sure how important external log systems are to the
Paimon roadmap.

Best,
Alexander

Reply via email to