Hey Ioannis,
I am also relatively new to Apache Paimon, so I may be wrong. There are 2 state representations in Paimon - FileStore and LogStore. FileStore is the LSM tree - a set of immutable data files organized via the compaction process with Snapshot and Manifest metadata files that allow point-in-time and streaming queries[1]. LogStore has 3 possible implementations: 1. No LogStore at all (default)[2] If you issue a streaming query here, the changelog is produced by reading available snapshots one after another and producing a diff. This is an expensive approach as it requires a stateful Normalization Node that implements the diffing logic. 2. LogStore via changelog files. There are 3 different ways[3][4][5] how this can be implemented with their different trade-offs. With any of them, the changelog files are available for consumption, so streaming queries become considerably cheaper. 3. LogStore via an external system, currently only Kafka. Streaming queries read records from the topic instead of changelog files on DFS. Batch queries still read only from the LSM tree on DFS; they do not access Kafka. There are multiple correctness issues with external LogStore that, I believe, can’t be fixed without making LogStore the source of truth. FWIW, I do not propose using Kafka as the default LogStore. Apache Paimon will, by default, produce the changelog files and store them on DFS in the same way as it does already when changelog-producer!=none[2]. However, using Kafka as the LogStore enables a variety of new features, like: 1. Support for log consumers directly from Kafka. 2. Optional weaker consistency model / better scalability potential since Kafka brokers become transaction coordinators. 3. Being able to create FileStores for existing topics. Best, Alex 1. https://paimon.apache.org/docs/master/concepts/file-layouts/ 2. https://paimon.apache.org/docs/master/concepts/primary-key-table/#none 3. https://paimon.apache.org/docs/master/concepts/primary-key-table/#input 4. https://paimon.apache.org/docs/master/concepts/primary-key-table/#lookup 5. https://paimon.apache.org/docs/master/concepts/primary-key-table/#full-compaction On Apr 5, 2023 at 23:28:42, Ioannis Polyzos <[email protected]> wrote: > Hi guys, > this thread is really interesting. I'm new to Paimon and still trying to > wrap my head around it and how it differentiates from Apache Hudi for > example. > But my understanding so far is that I can use it as an underlying store > along with Flink to get streaming database capabilities - with really good > performance due to its LSM architecture. (Let me know if this statement is > not correct). > Basically, my source of truth would be Kafka (or some other system like > Pulsar or Redpanda). The FileStore makes a lot of sense for me then to > store the data, but why would I need the LogStore and what would be the use > cases? > i.e in my mind it would be like reading data from Kafka to write it again > in Kafka. In this proposal what happens if I don't want to use the > LogStore? and why use it at all? > Please excuse me if these questions are not valid - as I'm still trying to > wrap my head around the value proposition here. > > Thanks > > On Wed, Apr 5, 2023 at 11:31 PM Alexander Sorokoumov < > [email protected]> wrote: > > Hi Jingsong, Shammon, > > > Please find below my thoughts about external log systems, their value, and > > a sketch of a proposal on what should the systematic fix look like. I will > > happily write a more detailed proposal (PIP?) if the community finds this > > sketch attractive. > > > This answer turned out to be rather long, so to sum up my points, I think > > that: > > > 1. External log systems enable a variety of valuable use cases, and > > 2. The way to correctly enable them is to redesign the write path such > > that the LogStore becomes the source of truth and the coordinator on the > > write path. > > > > *External log systems enable a variety of valuable use cases.* > > > Here are reasons why I think external log systems are valuable: > > > 1. With an explicitly stored log, we don’t need the normalization node > > if the downstream consumer does not need retractions or the stream > > already > > contains them[1]. Having this option enables more flexibility around > > performance and functionality. > > 2. External log systems enable external consumers that can read the > > change log without Flink/Spark jobs. Integrations with external sinks > > are > > advantageous for Paimon as a streaming data lake. > > 3. Finally, this is an opportunity for Paimon to support different > > isolation/consistency models via pluggable external log systems or > > coordinators. The rest of this message elaborates on this point. > > > > *The way to correctly enable them is to redesign the write path such that > > the LogStore becomes the source of truth and the coordinator on the write > > path.* > > > First, let’s talk about the root cause of the problem at hand. Paimon’s > > dual write path coordinates concurrent writes in the FileStore Commit > > phase. FileStoreCommit creates a totally ordered sequence (by monotonically > > increasing snapshot IDs) out of concurrently running non-conflicting > > snapshots by[2]: > > > 1. Observing the concurrently committed snapshot, then > > 2. Verifying that the current job can proceed because there is no > > conflict. > > > > This way, FileStoreCommit orders concurrent snapshots one after another if > > it does not find conflicting key ranges or aborts conflicting snapshots it > > does. As all changes that belong to a single snapshot are grouped together > > in, well, that snapshot, FileStoreCommit effectively reorders the write > > sequence compared to the “original” write order that happened in the write > > phase. In the illustration below LogStore has records in the original order > > and it does not agree with the order in the FileStore: > > > > > https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/f6b763c70e0ddbfaabab5806ceaf6cb8c13c1fcd/concurrent-writers.png > > > In the log order, writes from txn1 and txn2 interleave. In the FileStore > > order, all writes from txn1 (grey) happened before all writes from txn2 > > (red). > > > This is generally too late for the external LogStore to change the write > > order as it has been appending records in the write phase; it can only > > abort or commit. *As a result, FileStore snapshots can become inconsistent > > with writes in the LogStore.* In other words, a reader that consumes the > > LogStore up to the topic offset specified in the FileStore snapshot might > > not be able to reproduce that snapshot. > > > To make my argument more specific, for the next couple of paragraphs, I > > will assume that the external logging system is Kafka, configured with > > EXACTLY_ONCE. > > > If we look at Apache Paimon’s write path as it is right now, there are 3 > > conceptual roles: > > > 1. FileStore writer - current “source of truth". > > 2. LogStore writer - external or optionally derived[3] from FileStore > > snapshots when changelog-producer != Input. > > 3. Write coordinator - is required for conflict resolution. This role > > currently requires the external Catalog Lock (disabled by default, > > configured by lock.enabled, the only available implementation is > > HiveCatalogLock[4]) to guarantee Snapshot Isolation (SI); without this > > lock, the FileStore writer can’t atomically (CAS) install the new > > snapshot > > without the risk of overwriting concurrently committed one. The > > coordinator > > is currently implemented and combined with the FileStore writer. > > > > > > https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/83a8b3299cefe6a5f0d099f0e0e074b73594bfbc/paimon-write-path.png > > > I propose to change these roles such that: > > > - Write order in LogStore is “the source of truth". The default > > implementation will write the change log and the set of keys to resolve > > SI > > conflicts. > > - Write coordinator is implemented in the LogStore instead of FileStore. > > There are 2 implementations: > > - "Internal changelog” (default) that guarantees Snapshot Isolation via > > an optional Hive Catalog Lock. > > - External logging systems that also acts as a coordinator and > > provides guarantees specific to that system. For example, with > > Kafka as the > > logging system, Kafka EXACTLY_ONCE transactions determine the > > consistency semantics. > > - In turn, FileStore writers read the change log and “materialize” it > > into snapshots in the background. The FileStore writes can be done > > either > > in a blocking way or as a standalone job, similar to compaction. > > - In addition, we will need to adjust the read path. It will use the > > latest “materialized” snapshots as a base and optionally apply a > > non-yet-materialized changelog on top of it . > > > > > > https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/83a8b3299cefe6a5f0d099f0e0e074b73594bfbc/proposed-write-path.png > > > With this change, we achieve several interesting properties, for example: > > > - Writes in LogStore are always consistent (as in, FileStore snapshots > > become the actual snapshots of the LogStore at a certain offset) with > > FileStore snapshots with the internal and external logging systems . > > This > > property eliminates a whole class of anomalies caused by FileStore and > > LogStore not agreeing on the write order and visibility. > > - The actual consistency guarantees (consistency model of the > > distributed system) offered by Apache Paimon depend on the LogStore > > implementation. While the default LogStore implementation still offers > > SI, > > the Kafka-based implementation will offer weaker consistency guarantees > > that might be useful for cases with higher contention. > > - We can use Kafka as the external log system and the coordinator to > > enable non-Flink/Spark consumers that read directly from Kafka. > > - Even though I talked about Kafka here, it will be easier to plug in > > other external logging systems, allowing other logging sinks and > > consistency models. > > > > Please let me know what you think about it. I am aware that this sketch of > > a proposal implies quite a redesign of the write and read paths, but, in my > > opinion, there is no easier way to solve consistency issues between > > FileStore and external LogStores and make Apache Paimon’s consistency model > > extensible. > > > > 1. > > https://paimon.apache.org/docs/master/concepts/primary-key-table/#none > > 2. > > > > https://github.com/apache/incubator-paimon/blob/4a56565ea9bd30efabefd09734432d34c48aff22/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java#L506C15-L514 > > 3. > > > > https://paimon.apache.org/docs/master/concepts/primary-key-table/#changelog-producers > > 4. > > > > https://github.com/apache/incubator-paimon/blob/4a56565ea9bd30efabefd09734432d34c48aff22/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java#L44 > > > > Best, > > Alex > > > On Mar 31, 2023 at 04:49:15, Shammon FY <[email protected]> wrote: > > > > Hi Alexander and Jingsong > > > > > > Sorry for joining the discussion late. I think @Alexander raise an > > > important question: the data consistency between log system and file > > store > > > in Paimon. In fact it also confuses me. > > > > > > In addition to multiple jobs writing to the same table, even when a job > > > writes to a table, there will be different data in log system and file > > > store after the job failover. > > > > > > As @Jingsong mentioned above, `external log system as the binlog storage > > of > > > the store`, but the file store won't recover from log system. It recovers > > > from `checkpoint` in upstream streaming process system which will cause > > the > > > data gap between log system and file store. > > > > > > So my question is: do we really need a log system? If so, is it necessary > > > to ensure data consistency between log system and file store? For > > example, > > > rollback the uncommit data of log system according to the file store? > > > > > > If users don't care the `wrong data` in log system, should Paimon support > > > users reading 'uncommitted' data in the future to support millisecond > > level > > > delays without log system? > > > > > > Hope to hear from you, thanks > > > > > > Best, > > > Shammon FY > > > > > > > > > On Tue, Mar 28, 2023 at 3:27 PM Jingsong Li <[email protected]> > > > wrote: > > > > > > Hi Alexander, > > > > > > > > > Thanks for starting this discussion. > > > > > > > > > Your analysis is right, currently kafka integration is having such > > > > > > inconsistency issues. > > > > > > You can share your thoughts on this. > > > > > > > > > I'll share my thoughts here: > > > > > > > > > The external log system makes a lot of sense, but currently there are > > > > > > some usability issues. > > > > > > > > > The kafka transaction confuses me. If the kafka transaction is turned > > > > > > on, then why not just read the incremental data from the FileStore, > > > > > > there is not much difference in their latency. > > > > > > > > > So my idea is external log system as the binlog storage of the store, > > > > > > its data should be written into by the store, and it can make the data > > > > > > visible in advance to reduce the latency, so that paimon really > > > > > > provides millisecond-level l-stream read latency. > > > > > > There is still a lot of design and advancement missing here. > > > > > > > > > Best, > > > > > > Jingsong > > > > > > > > > On Tue, Mar 28, 2023 at 10:44 AM Alexander Sorokoumov > > > > > > <[email protected]> wrote: > > > > > > > > > > > > > > Hi Paimon community, > > > > > > > > > > > > > > Congratulations on starting the incubating project! > > > > > > > > > > > > > > I have a question about preserving the write order and visibility > > between > > > > > > > LogStore and FileStore when LogStore is configured as an external > > system, > > > > > > > e.g., Kafka. To my knowledge, Paimon does not coordinate writes to > > > > > > LogStore > > > > > > > and FileStore between multiple Flink jobs writing to the same table. > > The > > > > > > > only check I am aware of is Snapshot Isolation key range check for > > > > > > > concurrent snapshot commits. > > > > > > > > > > > > > > If the assumptions above are correct, then the record order in Kafka > > does > > > > > > > not really correspond to the order in FileStore’s snapshots. In > > addition, > > > > > > > Kafka topics have different visibility guarantees under EXACTLY_ONCE, > > > > > > > compared to FileStore. As a motivating example, consider the following > > > > > > > setup: > > > > > > > > > > > > > > 1. Paimon table configured with Kafka as an external log system. The > > > > > > > change log topic t1 has 1 partition. > > > > > > > 2. There are 2 concurrent Flink jobs writing to the same table > > > > > > > non-intersecting key ranges. Job1 writes keys {x1-x18}, Job2 writes > > > > > > keys > > > > > > > {y1-y4}. > > > > > > > 3. Both jobs write with EXACTLY_ONCE delivery guarantee, so there > > are > > > > > > 2 > > > > > > > Kafka transactions - txn1 for Job1 and txn2 for Job2. > > > > > > > 4. The final records order in t1 is [x1, x2, x3, x4, y1, y2, x5, x6, > > > > > > x7, > > > > > > > y3, y4, x8]. > > > > > > > 5. Snapshot1 of Job1 is concurrent to Snapshot2 of Job2. Snapshot1 > > > > > > will > > > > > > > commit first, Snapshot2 will commit second. > > > > > > > 6. See illustration for clarifications - > > > > > > > > > > > > > > > > > > > https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/f6b763c70e0ddbfaabab5806ceaf6cb8c13c1fcd/concurrent-writers.png > > > > > > > . > > > > > > > > > > > > > > > > > > > > > In this setup, the following anomalies are possible: > > > > > > > > > > > > > > 1. Consider a continuous streaming query submitted after Snapshot 1 > > > > > > > committed, before Snapshot 2 committed. Such a query returns > > > > > > Snapshot 1 > > > > > > > and then streams newer records. The snapshot will contain records > > > > > > {x1-x8} > > > > > > > and won’t contain {y1-y2}. The streaming results would tail records > > > > > > after > > > > > > > the last covered offset, so after 12 . As a result, such query won’t > > > > > > ever > > > > > > > return records from txn2. > > > > > > > 2. Consider a batch query submitted after Snapshot 1 is committed, > > > > > > > before Snapshot 2 is committed. Such a query would return {x1-x8}. > > > > > > However, > > > > > > > a following streaming query reading from offset 1 would not return > > any > > > > > > > results until Snapshot 2 commits. This would happen because the > > > > > > streaming > > > > > > > query would read from the Kafka topic under READ_COMMITTED. As txn1 > > > > > > and > > > > > > > txn2 overlap, txn1 records are not visible until txn2 commits. > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think I have some ideas about this, but I'm worried the scope might > > be > > > > > > > too large, and I'm not sure how important external log systems are to > > the > > > > > > > Paimon roadmap. > > > > > > > > > > > > > > Best, > > > > > > > Alexander > > > > > > > > > > > >
