Thanks very much! Alex.

I understand your proposal, Very detailed, very good!

I need some time to digest it.

Best,
Jingsong

On Fri, Apr 7, 2023 at 5:46 AM Alexander Sorokoumov
<[email protected]> wrote:
>
> Hey Ioannis,
>
>
> I am also relatively new to Apache Paimon, so I may be wrong. There are 2
> state representations in Paimon - FileStore and LogStore. FileStore is the
> LSM tree - a set of immutable data files organized via the compaction
> process with Snapshot and Manifest metadata files that allow point-in-time
> and streaming queries[1]. LogStore has 3 possible implementations:
>
>    1. No LogStore at all (default)[2] If you issue a streaming query here,
>    the changelog is produced by reading available snapshots one after another
>    and producing a diff. This is an expensive approach as it requires a
>    stateful Normalization Node that implements the diffing logic.
>    2. LogStore via changelog files. There are 3 different ways[3][4][5] how
>    this can be implemented with their different trade-offs. With any of them,
>    the changelog files are available for consumption, so streaming queries
>    become considerably cheaper.
>    3. LogStore via an external system, currently only Kafka. Streaming
>    queries read records from the topic instead of changelog files on DFS.
>    Batch queries still read only from the LSM tree on DFS; they do not access
>    Kafka. There are multiple correctness issues with external LogStore that, I
>    believe, can’t be fixed without making LogStore the source of truth.
>
>
> FWIW, I do not propose using Kafka as the default LogStore. Apache Paimon
> will, by default, produce the changelog files and store them on DFS in the
> same way as it does already when changelog-producer!=none[2]. However,
> using Kafka as the LogStore enables a variety of new features, like:
>
>    1. Support for log consumers directly from Kafka.
>    2. Optional weaker consistency model / better scalability potential
>    since Kafka brokers become transaction coordinators.
>    3. Being able to create FileStores for existing topics.
>
>
> Best,
>
> Alex
>
>
>    1. https://paimon.apache.org/docs/master/concepts/file-layouts/
>    2. https://paimon.apache.org/docs/master/concepts/primary-key-table/#none
>    3.
>    https://paimon.apache.org/docs/master/concepts/primary-key-table/#input
>    4.
>    https://paimon.apache.org/docs/master/concepts/primary-key-table/#lookup
>    5.
>    
> https://paimon.apache.org/docs/master/concepts/primary-key-table/#full-compaction
>
>
>
>
>
> On Apr 5, 2023 at 23:28:42, Ioannis Polyzos <[email protected]> wrote:
>
> > Hi guys,
> > this thread is really interesting. I'm new to Paimon and still trying to
> > wrap my head around it and how it differentiates from Apache Hudi for
> > example.
> > But my understanding so far is that I can use it as an underlying store
> > along with Flink to get streaming database capabilities - with really good
> > performance due to its LSM architecture. (Let me know if this statement is
> > not correct).
> > Basically, my source of truth would be Kafka (or some other system like
> > Pulsar or Redpanda). The FileStore makes a lot of sense for me then to
> > store the data, but why would I need the LogStore and what would be the use
> > cases?
> > i.e in my mind it would be like reading data from Kafka to write it again
> > in Kafka. In this proposal what happens if I don't want to use the
> > LogStore? and why use it at all?
> > Please excuse me if these questions are not valid - as I'm still trying to
> > wrap my head around the value proposition here.
> >
> > Thanks
> >
> > On Wed, Apr 5, 2023 at 11:31 PM Alexander Sorokoumov <
> > [email protected]> wrote:
> >
> >  Hi Jingsong, Shammon,
> >
> >
> > Please find below my thoughts about external log systems, their value, and
> >
> > a sketch of a proposal on what should the systematic fix look like. I will
> >
> > happily write a more detailed proposal (PIP?) if the community finds this
> >
> > sketch attractive.
> >
> >
> > This answer turned out to be rather long, so to sum up my points, I think
> >
> > that:
> >
> >
> >    1. External log systems enable a variety of valuable use cases, and
> >
> >    2. The way to correctly enable them is to redesign the write path such
> >
> >    that the LogStore becomes the source of truth and the coordinator on the
> >
> >    write path.
> >
> >
> >
> > *External log systems enable a variety of valuable use cases.*
> >
> >
> > Here are reasons why I think external log systems are valuable:
> >
> >
> >    1. With an explicitly stored log, we don’t need the normalization node
> >
> >    if the downstream consumer does not need retractions or the stream
> >
> > already
> >
> >    contains them[1]. Having this option enables more flexibility around
> >
> >    performance and functionality.
> >
> >    2. External log systems enable external consumers that can read the
> >
> >    change log without Flink/Spark jobs. Integrations with external sinks
> >
> > are
> >
> >    advantageous for Paimon as a streaming data lake.
> >
> >    3. Finally, this is an opportunity for Paimon to support different
> >
> >    isolation/consistency models via pluggable external log systems or
> >
> >    coordinators. The rest of this message elaborates on this point.
> >
> >
> >
> > *The way to correctly enable them is to redesign the write path such that
> >
> > the LogStore becomes the source of truth and the coordinator on the write
> >
> > path.*
> >
> >
> > First, let’s talk about the root cause of the problem at hand. Paimon’s
> >
> > dual write path coordinates concurrent writes in the FileStore Commit
> >
> > phase. FileStoreCommit creates a totally ordered sequence (by monotonically
> >
> > increasing snapshot IDs) out of concurrently running non-conflicting
> >
> > snapshots by[2]:
> >
> >
> >    1. Observing the concurrently committed snapshot, then
> >
> >    2. Verifying that the current job can proceed because there is no
> >
> >    conflict.
> >
> >
> >
> > This way, FileStoreCommit orders concurrent snapshots one after another if
> >
> > it does not find conflicting key ranges or aborts conflicting snapshots it
> >
> > does. As all changes that belong to a single snapshot are grouped together
> >
> > in, well, that snapshot, FileStoreCommit effectively reorders the write
> >
> > sequence compared to the “original” write order that happened in the write
> >
> > phase. In the illustration below LogStore has records in the original order
> >
> > and it does not agree with the order in the FileStore:
> >
> >
> >
> >
> > https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/f6b763c70e0ddbfaabab5806ceaf6cb8c13c1fcd/concurrent-writers.png
> >
> >
> > In the log order, writes from txn1 and txn2 interleave. In the FileStore
> >
> > order, all writes from txn1 (grey) happened before all writes from txn2
> >
> > (red).
> >
> >
> > This is generally too late for the external LogStore to change the write
> >
> > order as it has been appending records in the write phase; it can only
> >
> > abort or commit. *As a result, FileStore snapshots can become inconsistent
> >
> > with writes in the LogStore.* In other words, a reader that consumes the
> >
> > LogStore up to the topic offset specified in the FileStore snapshot might
> >
> > not be able to reproduce that snapshot.
> >
> >
> > To make my argument more specific, for the next couple of paragraphs, I
> >
> > will assume that the external logging system is Kafka, configured with
> >
> > EXACTLY_ONCE.
> >
> >
> > If we look at Apache Paimon’s write path as it is right now, there are 3
> >
> > conceptual roles:
> >
> >
> >    1. FileStore writer - current “source of truth".
> >
> >    2. LogStore writer - external or optionally derived[3] from FileStore
> >
> >    snapshots when changelog-producer != Input.
> >
> >    3. Write coordinator - is required for conflict resolution. This role
> >
> >    currently requires the external Catalog Lock (disabled by default,
> >
> >    configured by lock.enabled, the only available implementation is
> >
> >    HiveCatalogLock[4]) to guarantee Snapshot Isolation (SI); without this
> >
> >    lock, the FileStore writer can’t atomically (CAS) install the new
> >
> > snapshot
> >
> >    without the risk of overwriting concurrently committed one. The
> >
> > coordinator
> >
> >    is currently implemented and combined with the FileStore writer.
> >
> >
> >
> >
> >
> > https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/83a8b3299cefe6a5f0d099f0e0e074b73594bfbc/paimon-write-path.png
> >
> >
> > I propose to change these roles such that:
> >
> >
> >    - Write order in LogStore is “the source of truth". The default
> >
> >    implementation will write the change log and the set of keys to resolve
> >
> > SI
> >
> >    conflicts.
> >
> >    - Write coordinator is implemented in the LogStore instead of FileStore.
> >
> >    There are 2 implementations:
> >
> >    - "Internal changelog” (default) that guarantees Snapshot Isolation via
> >
> >       an optional Hive Catalog Lock.
> >
> >       - External logging systems that also acts as a coordinator and
> >
> >       provides guarantees specific to that system. For example, with
> >
> > Kafka as the
> >
> >       logging system, Kafka EXACTLY_ONCE transactions determine the
> >
> >       consistency semantics.
> >
> >    - In turn, FileStore writers read the change log and “materialize” it
> >
> >    into snapshots in the background. The FileStore writes can be done
> >
> > either
> >
> >    in a blocking way or as a standalone job, similar to compaction.
> >
> >    - In addition, we will need to adjust the read path. It will use the
> >
> >    latest “materialized” snapshots as a base and optionally apply a
> >
> >    non-yet-materialized changelog on top of it .
> >
> >
> >
> >
> >
> > https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/83a8b3299cefe6a5f0d099f0e0e074b73594bfbc/proposed-write-path.png
> >
> >
> > With this change, we achieve several interesting properties, for example:
> >
> >
> >    - Writes in LogStore are always consistent (as in, FileStore snapshots
> >
> >    become the actual snapshots of the LogStore at a certain offset) with
> >
> >    FileStore snapshots with the internal and external logging systems .
> >
> > This
> >
> >    property eliminates a whole class of anomalies caused by FileStore and
> >
> >    LogStore not agreeing on the write order and visibility.
> >
> >    - The actual consistency guarantees (consistency model of the
> >
> >    distributed system) offered by Apache Paimon depend on the LogStore
> >
> >    implementation. While the default LogStore implementation still offers
> >
> > SI,
> >
> >    the Kafka-based implementation will offer weaker consistency guarantees
> >
> >    that might be useful for cases with higher contention.
> >
> >    - We can use Kafka as the external log system and the coordinator to
> >
> >    enable non-Flink/Spark consumers that read directly from Kafka.
> >
> >    - Even though I talked about Kafka here, it will be easier to plug in
> >
> >    other external logging systems, allowing other logging sinks and
> >
> >    consistency models.
> >
> >
> >
> > Please let me know what you think about it. I am aware that this sketch of
> >
> > a proposal implies quite a redesign of the write and read paths, but, in my
> >
> > opinion, there is no easier way to solve consistency issues between
> >
> > FileStore and external LogStores and make Apache Paimon’s consistency model
> >
> > extensible.
> >
> >
> >
> >    1.
> >
> > https://paimon.apache.org/docs/master/concepts/primary-key-table/#none
> >
> >    2.
> >
> >
> >
> > https://github.com/apache/incubator-paimon/blob/4a56565ea9bd30efabefd09734432d34c48aff22/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java#L506C15-L514
> >
> >    3.
> >
> >
> >
> > https://paimon.apache.org/docs/master/concepts/primary-key-table/#changelog-producers
> >
> >    4.
> >
> >
> >
> > https://github.com/apache/incubator-paimon/blob/4a56565ea9bd30efabefd09734432d34c48aff22/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java#L44
> >
> >
> >
> > Best,
> >
> > Alex
> >
> >
> > On Mar 31, 2023 at 04:49:15, Shammon FY <[email protected]> wrote:
> >
> >
> > > Hi Alexander and Jingsong
> >
> > >
> >
> > > Sorry for joining the discussion late. I think @Alexander raise an
> >
> > > important question: the data consistency between log system and file
> >
> > store
> >
> > > in Paimon. In fact it also confuses me.
> >
> > >
> >
> > > In addition to multiple jobs writing to the same table, even when a job
> >
> > > writes to a table, there will be different data in log system and file
> >
> > > store after the job failover.
> >
> > >
> >
> > > As @Jingsong mentioned above, `external log system as the binlog storage
> >
> > of
> >
> > > the store`, but the file store won't recover from log system. It recovers
> >
> > > from `checkpoint` in upstream streaming process system which will cause
> >
> > the
> >
> > > data gap between log system and file store.
> >
> > >
> >
> > > So my question is: do we really need a log system? If so, is it necessary
> >
> > > to ensure data consistency between log system and file store? For
> >
> > example,
> >
> > > rollback the uncommit data of log system according to the file store?
> >
> > >
> >
> > > If users don't care the `wrong data` in log system, should Paimon support
> >
> > > users reading 'uncommitted' data in the future to support millisecond
> >
> > level
> >
> > > delays without log system?
> >
> > >
> >
> > > Hope to hear from you, thanks
> >
> > >
> >
> > > Best,
> >
> > > Shammon FY
> >
> > >
> >
> > >
> >
> > > On Tue, Mar 28, 2023 at 3:27 PM Jingsong Li <[email protected]>
> >
> > > wrote:
> >
> > >
> >
> > > Hi Alexander,
> >
> > >
> >
> > >
> >
> > > Thanks for starting this discussion.
> >
> > >
> >
> > >
> >
> > > Your analysis is right, currently kafka integration is having such
> >
> > >
> >
> > > inconsistency issues.
> >
> > >
> >
> > > You can share your thoughts on this.
> >
> > >
> >
> > >
> >
> > > I'll share my thoughts here:
> >
> > >
> >
> > >
> >
> > > The external log system makes a lot of sense, but currently there are
> >
> > >
> >
> > > some usability issues.
> >
> > >
> >
> > >
> >
> > > The kafka transaction confuses me. If the kafka transaction is turned
> >
> > >
> >
> > > on, then why not just read the incremental data from the FileStore,
> >
> > >
> >
> > > there is not much difference in their latency.
> >
> > >
> >
> > >
> >
> > > So my idea is external log system as the binlog storage of the store,
> >
> > >
> >
> > > its data should be written into by the store, and it can make the data
> >
> > >
> >
> > > visible in advance to reduce the latency, so that paimon really
> >
> > >
> >
> > > provides millisecond-level l-stream read latency.
> >
> > >
> >
> > > There is still a lot of design and advancement missing here.
> >
> > >
> >
> > >
> >
> > > Best,
> >
> > >
> >
> > > Jingsong
> >
> > >
> >
> > >
> >
> > > On Tue, Mar 28, 2023 at 10:44 AM Alexander Sorokoumov
> >
> > >
> >
> > > <[email protected]> wrote:
> >
> > >
> >
> > > >
> >
> > >
> >
> > > > Hi Paimon community,
> >
> > >
> >
> > > >
> >
> > >
> >
> > > > Congratulations on starting the incubating project!
> >
> > >
> >
> > > >
> >
> > >
> >
> > > > I have a question about preserving the write order and visibility
> >
> > between
> >
> > >
> >
> > > > LogStore and FileStore when LogStore is configured as an external
> >
> > system,
> >
> > >
> >
> > > > e.g., Kafka. To my knowledge, Paimon does not coordinate writes to
> >
> > >
> >
> > > LogStore
> >
> > >
> >
> > > > and FileStore between multiple Flink jobs writing to the same table.
> >
> > The
> >
> > >
> >
> > > > only check I am aware of is Snapshot Isolation key range check for
> >
> > >
> >
> > > > concurrent snapshot commits.
> >
> > >
> >
> > > >
> >
> > >
> >
> > > > If the assumptions above are correct, then the record order in Kafka
> >
> > does
> >
> > >
> >
> > > > not really correspond to the order in FileStore’s snapshots. In
> >
> > addition,
> >
> > >
> >
> > > > Kafka topics have different visibility guarantees under EXACTLY_ONCE,
> >
> > >
> >
> > > > compared to FileStore. As a motivating example, consider the following
> >
> > >
> >
> > > > setup:
> >
> > >
> >
> > > >
> >
> > >
> >
> > > >    1. Paimon table configured with Kafka as an external log system. The
> >
> > >
> >
> > > >    change log topic t1 has 1 partition.
> >
> > >
> >
> > > >    2. There are 2 concurrent Flink jobs writing to the same table
> >
> > >
> >
> > > >    non-intersecting key ranges.  Job1 writes keys {x1-x18}, Job2 writes
> >
> > >
> >
> > > keys
> >
> > >
> >
> > > >    {y1-y4}.
> >
> > >
> >
> > > >    3. Both jobs write with EXACTLY_ONCE delivery guarantee, so there
> >
> > are
> >
> > >
> >
> > > 2
> >
> > >
> >
> > > >    Kafka transactions - txn1 for Job1 and txn2 for Job2.
> >
> > >
> >
> > > >    4. The final records order in t1 is [x1, x2, x3, x4, y1, y2, x5, x6,
> >
> > >
> >
> > > x7,
> >
> > >
> >
> > > >    y3, y4, x8].
> >
> > >
> >
> > > >    5. Snapshot1 of Job1 is concurrent to Snapshot2 of Job2. Snapshot1
> >
> > >
> >
> > > will
> >
> > >
> >
> > > >    commit first, Snapshot2 will commit second.
> >
> > >
> >
> > > >    6. See illustration for clarifications -
> >
> > >
> >
> > > >
> >
> > >
> >
> > >
> >
> > >
> >
> >
> > https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/f6b763c70e0ddbfaabab5806ceaf6cb8c13c1fcd/concurrent-writers.png
> >
> > >
> >
> > > >    .
> >
> > >
> >
> > > >
> >
> > >
> >
> > > >
> >
> > >
> >
> > > >  In this setup, the following anomalies are possible:
> >
> > >
> >
> > > >
> >
> > >
> >
> > > >    1. Consider a continuous streaming query submitted after Snapshot 1
> >
> > >
> >
> > > >    committed, before Snapshot 2 committed. Such a query  returns
> >
> > >
> >
> > > Snapshot 1
> >
> > >
> >
> > > >    and then streams newer records. The snapshot will contain records
> >
> > >
> >
> > > {x1-x8}
> >
> > >
> >
> > > >    and won’t contain {y1-y2}. The streaming results would tail records
> >
> > >
> >
> > > after
> >
> > >
> >
> > > >    the last covered offset, so after 12 . As a result, such query won’t
> >
> > >
> >
> > > ever
> >
> > >
> >
> > > >    return records from txn2.
> >
> > >
> >
> > > >    2. Consider a batch query submitted after Snapshot 1 is committed,
> >
> > >
> >
> > > >    before Snapshot 2 is committed. Such a query would return {x1-x8}.
> >
> > >
> >
> > > However,
> >
> > >
> >
> > > >    a following streaming query reading from offset 1 would not return
> >
> > any
> >
> > >
> >
> > > >    results until Snapshot 2 commits. This would happen because the
> >
> > >
> >
> > > streaming
> >
> > >
> >
> > > >    query would read from the Kafka topic under READ_COMMITTED. As txn1
> >
> > >
> >
> > > and
> >
> > >
> >
> > > >    txn2 overlap, txn1 records are not visible until txn2 commits.
> >
> > >
> >
> > > >
> >
> > >
> >
> > > >
> >
> > >
> >
> > > >
> >
> > >
> >
> > > > I think I have some ideas about this, but I'm worried the scope might
> >
> > be
> >
> > >
> >
> > > > too large, and I'm not sure how important external log systems are to
> >
> > the
> >
> > >
> >
> > > > Paimon roadmap.
> >
> > >
> >
> > > >
> >
> > >
> >
> > > > Best,
> >
> > >
> >
> > > > Alexander
> >
> > >
> >
> > >
> >
> > >
> >
> >
> >

Reply via email to