Thanks very much! Alex. I understand your proposal, Very detailed, very good!
I need some time to digest it. Best, Jingsong On Fri, Apr 7, 2023 at 5:46 AM Alexander Sorokoumov <[email protected]> wrote: > > Hey Ioannis, > > > I am also relatively new to Apache Paimon, so I may be wrong. There are 2 > state representations in Paimon - FileStore and LogStore. FileStore is the > LSM tree - a set of immutable data files organized via the compaction > process with Snapshot and Manifest metadata files that allow point-in-time > and streaming queries[1]. LogStore has 3 possible implementations: > > 1. No LogStore at all (default)[2] If you issue a streaming query here, > the changelog is produced by reading available snapshots one after another > and producing a diff. This is an expensive approach as it requires a > stateful Normalization Node that implements the diffing logic. > 2. LogStore via changelog files. There are 3 different ways[3][4][5] how > this can be implemented with their different trade-offs. With any of them, > the changelog files are available for consumption, so streaming queries > become considerably cheaper. > 3. LogStore via an external system, currently only Kafka. Streaming > queries read records from the topic instead of changelog files on DFS. > Batch queries still read only from the LSM tree on DFS; they do not access > Kafka. There are multiple correctness issues with external LogStore that, I > believe, can’t be fixed without making LogStore the source of truth. > > > FWIW, I do not propose using Kafka as the default LogStore. Apache Paimon > will, by default, produce the changelog files and store them on DFS in the > same way as it does already when changelog-producer!=none[2]. However, > using Kafka as the LogStore enables a variety of new features, like: > > 1. Support for log consumers directly from Kafka. > 2. Optional weaker consistency model / better scalability potential > since Kafka brokers become transaction coordinators. > 3. Being able to create FileStores for existing topics. > > > Best, > > Alex > > > 1. https://paimon.apache.org/docs/master/concepts/file-layouts/ > 2. https://paimon.apache.org/docs/master/concepts/primary-key-table/#none > 3. > https://paimon.apache.org/docs/master/concepts/primary-key-table/#input > 4. > https://paimon.apache.org/docs/master/concepts/primary-key-table/#lookup > 5. > > https://paimon.apache.org/docs/master/concepts/primary-key-table/#full-compaction > > > > > > On Apr 5, 2023 at 23:28:42, Ioannis Polyzos <[email protected]> wrote: > > > Hi guys, > > this thread is really interesting. I'm new to Paimon and still trying to > > wrap my head around it and how it differentiates from Apache Hudi for > > example. > > But my understanding so far is that I can use it as an underlying store > > along with Flink to get streaming database capabilities - with really good > > performance due to its LSM architecture. (Let me know if this statement is > > not correct). > > Basically, my source of truth would be Kafka (or some other system like > > Pulsar or Redpanda). The FileStore makes a lot of sense for me then to > > store the data, but why would I need the LogStore and what would be the use > > cases? > > i.e in my mind it would be like reading data from Kafka to write it again > > in Kafka. In this proposal what happens if I don't want to use the > > LogStore? and why use it at all? > > Please excuse me if these questions are not valid - as I'm still trying to > > wrap my head around the value proposition here. > > > > Thanks > > > > On Wed, Apr 5, 2023 at 11:31 PM Alexander Sorokoumov < > > [email protected]> wrote: > > > > Hi Jingsong, Shammon, > > > > > > Please find below my thoughts about external log systems, their value, and > > > > a sketch of a proposal on what should the systematic fix look like. I will > > > > happily write a more detailed proposal (PIP?) if the community finds this > > > > sketch attractive. > > > > > > This answer turned out to be rather long, so to sum up my points, I think > > > > that: > > > > > > 1. External log systems enable a variety of valuable use cases, and > > > > 2. The way to correctly enable them is to redesign the write path such > > > > that the LogStore becomes the source of truth and the coordinator on the > > > > write path. > > > > > > > > *External log systems enable a variety of valuable use cases.* > > > > > > Here are reasons why I think external log systems are valuable: > > > > > > 1. With an explicitly stored log, we don’t need the normalization node > > > > if the downstream consumer does not need retractions or the stream > > > > already > > > > contains them[1]. Having this option enables more flexibility around > > > > performance and functionality. > > > > 2. External log systems enable external consumers that can read the > > > > change log without Flink/Spark jobs. Integrations with external sinks > > > > are > > > > advantageous for Paimon as a streaming data lake. > > > > 3. Finally, this is an opportunity for Paimon to support different > > > > isolation/consistency models via pluggable external log systems or > > > > coordinators. The rest of this message elaborates on this point. > > > > > > > > *The way to correctly enable them is to redesign the write path such that > > > > the LogStore becomes the source of truth and the coordinator on the write > > > > path.* > > > > > > First, let’s talk about the root cause of the problem at hand. Paimon’s > > > > dual write path coordinates concurrent writes in the FileStore Commit > > > > phase. FileStoreCommit creates a totally ordered sequence (by monotonically > > > > increasing snapshot IDs) out of concurrently running non-conflicting > > > > snapshots by[2]: > > > > > > 1. Observing the concurrently committed snapshot, then > > > > 2. Verifying that the current job can proceed because there is no > > > > conflict. > > > > > > > > This way, FileStoreCommit orders concurrent snapshots one after another if > > > > it does not find conflicting key ranges or aborts conflicting snapshots it > > > > does. As all changes that belong to a single snapshot are grouped together > > > > in, well, that snapshot, FileStoreCommit effectively reorders the write > > > > sequence compared to the “original” write order that happened in the write > > > > phase. In the illustration below LogStore has records in the original order > > > > and it does not agree with the order in the FileStore: > > > > > > > > > > https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/f6b763c70e0ddbfaabab5806ceaf6cb8c13c1fcd/concurrent-writers.png > > > > > > In the log order, writes from txn1 and txn2 interleave. In the FileStore > > > > order, all writes from txn1 (grey) happened before all writes from txn2 > > > > (red). > > > > > > This is generally too late for the external LogStore to change the write > > > > order as it has been appending records in the write phase; it can only > > > > abort or commit. *As a result, FileStore snapshots can become inconsistent > > > > with writes in the LogStore.* In other words, a reader that consumes the > > > > LogStore up to the topic offset specified in the FileStore snapshot might > > > > not be able to reproduce that snapshot. > > > > > > To make my argument more specific, for the next couple of paragraphs, I > > > > will assume that the external logging system is Kafka, configured with > > > > EXACTLY_ONCE. > > > > > > If we look at Apache Paimon’s write path as it is right now, there are 3 > > > > conceptual roles: > > > > > > 1. FileStore writer - current “source of truth". > > > > 2. LogStore writer - external or optionally derived[3] from FileStore > > > > snapshots when changelog-producer != Input. > > > > 3. Write coordinator - is required for conflict resolution. This role > > > > currently requires the external Catalog Lock (disabled by default, > > > > configured by lock.enabled, the only available implementation is > > > > HiveCatalogLock[4]) to guarantee Snapshot Isolation (SI); without this > > > > lock, the FileStore writer can’t atomically (CAS) install the new > > > > snapshot > > > > without the risk of overwriting concurrently committed one. The > > > > coordinator > > > > is currently implemented and combined with the FileStore writer. > > > > > > > > > > > > https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/83a8b3299cefe6a5f0d099f0e0e074b73594bfbc/paimon-write-path.png > > > > > > I propose to change these roles such that: > > > > > > - Write order in LogStore is “the source of truth". The default > > > > implementation will write the change log and the set of keys to resolve > > > > SI > > > > conflicts. > > > > - Write coordinator is implemented in the LogStore instead of FileStore. > > > > There are 2 implementations: > > > > - "Internal changelog” (default) that guarantees Snapshot Isolation via > > > > an optional Hive Catalog Lock. > > > > - External logging systems that also acts as a coordinator and > > > > provides guarantees specific to that system. For example, with > > > > Kafka as the > > > > logging system, Kafka EXACTLY_ONCE transactions determine the > > > > consistency semantics. > > > > - In turn, FileStore writers read the change log and “materialize” it > > > > into snapshots in the background. The FileStore writes can be done > > > > either > > > > in a blocking way or as a standalone job, similar to compaction. > > > > - In addition, we will need to adjust the read path. It will use the > > > > latest “materialized” snapshots as a base and optionally apply a > > > > non-yet-materialized changelog on top of it . > > > > > > > > > > > > https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/83a8b3299cefe6a5f0d099f0e0e074b73594bfbc/proposed-write-path.png > > > > > > With this change, we achieve several interesting properties, for example: > > > > > > - Writes in LogStore are always consistent (as in, FileStore snapshots > > > > become the actual snapshots of the LogStore at a certain offset) with > > > > FileStore snapshots with the internal and external logging systems . > > > > This > > > > property eliminates a whole class of anomalies caused by FileStore and > > > > LogStore not agreeing on the write order and visibility. > > > > - The actual consistency guarantees (consistency model of the > > > > distributed system) offered by Apache Paimon depend on the LogStore > > > > implementation. While the default LogStore implementation still offers > > > > SI, > > > > the Kafka-based implementation will offer weaker consistency guarantees > > > > that might be useful for cases with higher contention. > > > > - We can use Kafka as the external log system and the coordinator to > > > > enable non-Flink/Spark consumers that read directly from Kafka. > > > > - Even though I talked about Kafka here, it will be easier to plug in > > > > other external logging systems, allowing other logging sinks and > > > > consistency models. > > > > > > > > Please let me know what you think about it. I am aware that this sketch of > > > > a proposal implies quite a redesign of the write and read paths, but, in my > > > > opinion, there is no easier way to solve consistency issues between > > > > FileStore and external LogStores and make Apache Paimon’s consistency model > > > > extensible. > > > > > > > > 1. > > > > https://paimon.apache.org/docs/master/concepts/primary-key-table/#none > > > > 2. > > > > > > > > https://github.com/apache/incubator-paimon/blob/4a56565ea9bd30efabefd09734432d34c48aff22/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java#L506C15-L514 > > > > 3. > > > > > > > > https://paimon.apache.org/docs/master/concepts/primary-key-table/#changelog-producers > > > > 4. > > > > > > > > https://github.com/apache/incubator-paimon/blob/4a56565ea9bd30efabefd09734432d34c48aff22/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java#L44 > > > > > > > > Best, > > > > Alex > > > > > > On Mar 31, 2023 at 04:49:15, Shammon FY <[email protected]> wrote: > > > > > > > Hi Alexander and Jingsong > > > > > > > > > > Sorry for joining the discussion late. I think @Alexander raise an > > > > > important question: the data consistency between log system and file > > > > store > > > > > in Paimon. In fact it also confuses me. > > > > > > > > > > In addition to multiple jobs writing to the same table, even when a job > > > > > writes to a table, there will be different data in log system and file > > > > > store after the job failover. > > > > > > > > > > As @Jingsong mentioned above, `external log system as the binlog storage > > > > of > > > > > the store`, but the file store won't recover from log system. It recovers > > > > > from `checkpoint` in upstream streaming process system which will cause > > > > the > > > > > data gap between log system and file store. > > > > > > > > > > So my question is: do we really need a log system? If so, is it necessary > > > > > to ensure data consistency between log system and file store? For > > > > example, > > > > > rollback the uncommit data of log system according to the file store? > > > > > > > > > > If users don't care the `wrong data` in log system, should Paimon support > > > > > users reading 'uncommitted' data in the future to support millisecond > > > > level > > > > > delays without log system? > > > > > > > > > > Hope to hear from you, thanks > > > > > > > > > > Best, > > > > > Shammon FY > > > > > > > > > > > > > > > On Tue, Mar 28, 2023 at 3:27 PM Jingsong Li <[email protected]> > > > > > wrote: > > > > > > > > > > Hi Alexander, > > > > > > > > > > > > > > > Thanks for starting this discussion. > > > > > > > > > > > > > > > Your analysis is right, currently kafka integration is having such > > > > > > > > > > inconsistency issues. > > > > > > > > > > You can share your thoughts on this. > > > > > > > > > > > > > > > I'll share my thoughts here: > > > > > > > > > > > > > > > The external log system makes a lot of sense, but currently there are > > > > > > > > > > some usability issues. > > > > > > > > > > > > > > > The kafka transaction confuses me. If the kafka transaction is turned > > > > > > > > > > on, then why not just read the incremental data from the FileStore, > > > > > > > > > > there is not much difference in their latency. > > > > > > > > > > > > > > > So my idea is external log system as the binlog storage of the store, > > > > > > > > > > its data should be written into by the store, and it can make the data > > > > > > > > > > visible in advance to reduce the latency, so that paimon really > > > > > > > > > > provides millisecond-level l-stream read latency. > > > > > > > > > > There is still a lot of design and advancement missing here. > > > > > > > > > > > > > > > Best, > > > > > > > > > > Jingsong > > > > > > > > > > > > > > > On Tue, Mar 28, 2023 at 10:44 AM Alexander Sorokoumov > > > > > > > > > > <[email protected]> wrote: > > > > > > > > > > > > > > > > > > > > > > Hi Paimon community, > > > > > > > > > > > > > > > > > > > > > > Congratulations on starting the incubating project! > > > > > > > > > > > > > > > > > > > > > > I have a question about preserving the write order and visibility > > > > between > > > > > > > > > > > LogStore and FileStore when LogStore is configured as an external > > > > system, > > > > > > > > > > > e.g., Kafka. To my knowledge, Paimon does not coordinate writes to > > > > > > > > > > LogStore > > > > > > > > > > > and FileStore between multiple Flink jobs writing to the same table. > > > > The > > > > > > > > > > > only check I am aware of is Snapshot Isolation key range check for > > > > > > > > > > > concurrent snapshot commits. > > > > > > > > > > > > > > > > > > > > > > If the assumptions above are correct, then the record order in Kafka > > > > does > > > > > > > > > > > not really correspond to the order in FileStore’s snapshots. In > > > > addition, > > > > > > > > > > > Kafka topics have different visibility guarantees under EXACTLY_ONCE, > > > > > > > > > > > compared to FileStore. As a motivating example, consider the following > > > > > > > > > > > setup: > > > > > > > > > > > > > > > > > > > > > > 1. Paimon table configured with Kafka as an external log system. The > > > > > > > > > > > change log topic t1 has 1 partition. > > > > > > > > > > > 2. There are 2 concurrent Flink jobs writing to the same table > > > > > > > > > > > non-intersecting key ranges. Job1 writes keys {x1-x18}, Job2 writes > > > > > > > > > > keys > > > > > > > > > > > {y1-y4}. > > > > > > > > > > > 3. Both jobs write with EXACTLY_ONCE delivery guarantee, so there > > > > are > > > > > > > > > > 2 > > > > > > > > > > > Kafka transactions - txn1 for Job1 and txn2 for Job2. > > > > > > > > > > > 4. The final records order in t1 is [x1, x2, x3, x4, y1, y2, x5, x6, > > > > > > > > > > x7, > > > > > > > > > > > y3, y4, x8]. > > > > > > > > > > > 5. Snapshot1 of Job1 is concurrent to Snapshot2 of Job2. Snapshot1 > > > > > > > > > > will > > > > > > > > > > > commit first, Snapshot2 will commit second. > > > > > > > > > > > 6. See illustration for clarifications - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/f6b763c70e0ddbfaabab5806ceaf6cb8c13c1fcd/concurrent-writers.png > > > > > > > > > > > . > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In this setup, the following anomalies are possible: > > > > > > > > > > > > > > > > > > > > > > 1. Consider a continuous streaming query submitted after Snapshot 1 > > > > > > > > > > > committed, before Snapshot 2 committed. Such a query returns > > > > > > > > > > Snapshot 1 > > > > > > > > > > > and then streams newer records. The snapshot will contain records > > > > > > > > > > {x1-x8} > > > > > > > > > > > and won’t contain {y1-y2}. The streaming results would tail records > > > > > > > > > > after > > > > > > > > > > > the last covered offset, so after 12 . As a result, such query won’t > > > > > > > > > > ever > > > > > > > > > > > return records from txn2. > > > > > > > > > > > 2. Consider a batch query submitted after Snapshot 1 is committed, > > > > > > > > > > > before Snapshot 2 is committed. Such a query would return {x1-x8}. > > > > > > > > > > However, > > > > > > > > > > > a following streaming query reading from offset 1 would not return > > > > any > > > > > > > > > > > results until Snapshot 2 commits. This would happen because the > > > > > > > > > > streaming > > > > > > > > > > > query would read from the Kafka topic under READ_COMMITTED. As txn1 > > > > > > > > > > and > > > > > > > > > > > txn2 overlap, txn1 records are not visible until txn2 commits. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think I have some ideas about this, but I'm worried the scope might > > > > be > > > > > > > > > > > too large, and I'm not sure how important external log systems are to > > > > the > > > > > > > > > > > Paimon roadmap. > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Alexander > > > > > > > > > > > > > > > > > > > > >
