Thanks, for all the detailed explanations.
This is some really useful context to start wrapping my head around the
value proposition (compared to Hudi for example).
I'm looking forward to seeing and learning more.

Best

On Fri, Apr 7, 2023 at 11:36 AM Jingsong Li <[email protected]> wrote:

> Thanks very much! Alex.
>
> I understand your proposal, Very detailed, very good!
>
> I need some time to digest it.
>
> Best,
> Jingsong
>
> On Fri, Apr 7, 2023 at 5:46 AM Alexander Sorokoumov
> <[email protected]> wrote:
> >
> > Hey Ioannis,
> >
> >
> > I am also relatively new to Apache Paimon, so I may be wrong. There are 2
> > state representations in Paimon - FileStore and LogStore. FileStore is
> the
> > LSM tree - a set of immutable data files organized via the compaction
> > process with Snapshot and Manifest metadata files that allow
> point-in-time
> > and streaming queries[1]. LogStore has 3 possible implementations:
> >
> >    1. No LogStore at all (default)[2] If you issue a streaming query
> here,
> >    the changelog is produced by reading available snapshots one after
> another
> >    and producing a diff. This is an expensive approach as it requires a
> >    stateful Normalization Node that implements the diffing logic.
> >    2. LogStore via changelog files. There are 3 different ways[3][4][5]
> how
> >    this can be implemented with their different trade-offs. With any of
> them,
> >    the changelog files are available for consumption, so streaming
> queries
> >    become considerably cheaper.
> >    3. LogStore via an external system, currently only Kafka. Streaming
> >    queries read records from the topic instead of changelog files on DFS.
> >    Batch queries still read only from the LSM tree on DFS; they do not
> access
> >    Kafka. There are multiple correctness issues with external LogStore
> that, I
> >    believe, can’t be fixed without making LogStore the source of truth.
> >
> >
> > FWIW, I do not propose using Kafka as the default LogStore. Apache Paimon
> > will, by default, produce the changelog files and store them on DFS in
> the
> > same way as it does already when changelog-producer!=none[2]. However,
> > using Kafka as the LogStore enables a variety of new features, like:
> >
> >    1. Support for log consumers directly from Kafka.
> >    2. Optional weaker consistency model / better scalability potential
> >    since Kafka brokers become transaction coordinators.
> >    3. Being able to create FileStores for existing topics.
> >
> >
> > Best,
> >
> > Alex
> >
> >
> >    1. https://paimon.apache.org/docs/master/concepts/file-layouts/
> >    2.
> https://paimon.apache.org/docs/master/concepts/primary-key-table/#none
> >    3.
> >
> https://paimon.apache.org/docs/master/concepts/primary-key-table/#input
> >    4.
> >
> https://paimon.apache.org/docs/master/concepts/primary-key-table/#lookup
> >    5.
> >
> https://paimon.apache.org/docs/master/concepts/primary-key-table/#full-compaction
> >
> >
> >
> >
> >
> > On Apr 5, 2023 at 23:28:42, Ioannis Polyzos <[email protected]>
> wrote:
> >
> > > Hi guys,
> > > this thread is really interesting. I'm new to Paimon and still trying
> to
> > > wrap my head around it and how it differentiates from Apache Hudi for
> > > example.
> > > But my understanding so far is that I can use it as an underlying store
> > > along with Flink to get streaming database capabilities - with really
> good
> > > performance due to its LSM architecture. (Let me know if this
> statement is
> > > not correct).
> > > Basically, my source of truth would be Kafka (or some other system like
> > > Pulsar or Redpanda). The FileStore makes a lot of sense for me then to
> > > store the data, but why would I need the LogStore and what would be
> the use
> > > cases?
> > > i.e in my mind it would be like reading data from Kafka to write it
> again
> > > in Kafka. In this proposal what happens if I don't want to use the
> > > LogStore? and why use it at all?
> > > Please excuse me if these questions are not valid - as I'm still
> trying to
> > > wrap my head around the value proposition here.
> > >
> > > Thanks
> > >
> > > On Wed, Apr 5, 2023 at 11:31 PM Alexander Sorokoumov <
> > > [email protected]> wrote:
> > >
> > >  Hi Jingsong, Shammon,
> > >
> > >
> > > Please find below my thoughts about external log systems, their value,
> and
> > >
> > > a sketch of a proposal on what should the systematic fix look like. I
> will
> > >
> > > happily write a more detailed proposal (PIP?) if the community finds
> this
> > >
> > > sketch attractive.
> > >
> > >
> > > This answer turned out to be rather long, so to sum up my points, I
> think
> > >
> > > that:
> > >
> > >
> > >    1. External log systems enable a variety of valuable use cases, and
> > >
> > >    2. The way to correctly enable them is to redesign the write path
> such
> > >
> > >    that the LogStore becomes the source of truth and the coordinator
> on the
> > >
> > >    write path.
> > >
> > >
> > >
> > > *External log systems enable a variety of valuable use cases.*
> > >
> > >
> > > Here are reasons why I think external log systems are valuable:
> > >
> > >
> > >    1. With an explicitly stored log, we don’t need the normalization
> node
> > >
> > >    if the downstream consumer does not need retractions or the stream
> > >
> > > already
> > >
> > >    contains them[1]. Having this option enables more flexibility around
> > >
> > >    performance and functionality.
> > >
> > >    2. External log systems enable external consumers that can read the
> > >
> > >    change log without Flink/Spark jobs. Integrations with external
> sinks
> > >
> > > are
> > >
> > >    advantageous for Paimon as a streaming data lake.
> > >
> > >    3. Finally, this is an opportunity for Paimon to support different
> > >
> > >    isolation/consistency models via pluggable external log systems or
> > >
> > >    coordinators. The rest of this message elaborates on this point.
> > >
> > >
> > >
> > > *The way to correctly enable them is to redesign the write path such
> that
> > >
> > > the LogStore becomes the source of truth and the coordinator on the
> write
> > >
> > > path.*
> > >
> > >
> > > First, let’s talk about the root cause of the problem at hand. Paimon’s
> > >
> > > dual write path coordinates concurrent writes in the FileStore Commit
> > >
> > > phase. FileStoreCommit creates a totally ordered sequence (by
> monotonically
> > >
> > > increasing snapshot IDs) out of concurrently running non-conflicting
> > >
> > > snapshots by[2]:
> > >
> > >
> > >    1. Observing the concurrently committed snapshot, then
> > >
> > >    2. Verifying that the current job can proceed because there is no
> > >
> > >    conflict.
> > >
> > >
> > >
> > > This way, FileStoreCommit orders concurrent snapshots one after
> another if
> > >
> > > it does not find conflicting key ranges or aborts conflicting
> snapshots it
> > >
> > > does. As all changes that belong to a single snapshot are grouped
> together
> > >
> > > in, well, that snapshot, FileStoreCommit effectively reorders the write
> > >
> > > sequence compared to the “original” write order that happened in the
> write
> > >
> > > phase. In the illustration below LogStore has records in the original
> order
> > >
> > > and it does not agree with the order in the FileStore:
> > >
> > >
> > >
> > >
> > >
> https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/f6b763c70e0ddbfaabab5806ceaf6cb8c13c1fcd/concurrent-writers.png
> > >
> > >
> > > In the log order, writes from txn1 and txn2 interleave. In the
> FileStore
> > >
> > > order, all writes from txn1 (grey) happened before all writes from txn2
> > >
> > > (red).
> > >
> > >
> > > This is generally too late for the external LogStore to change the
> write
> > >
> > > order as it has been appending records in the write phase; it can only
> > >
> > > abort or commit. *As a result, FileStore snapshots can become
> inconsistent
> > >
> > > with writes in the LogStore.* In other words, a reader that consumes
> the
> > >
> > > LogStore up to the topic offset specified in the FileStore snapshot
> might
> > >
> > > not be able to reproduce that snapshot.
> > >
> > >
> > > To make my argument more specific, for the next couple of paragraphs, I
> > >
> > > will assume that the external logging system is Kafka, configured with
> > >
> > > EXACTLY_ONCE.
> > >
> > >
> > > If we look at Apache Paimon’s write path as it is right now, there are
> 3
> > >
> > > conceptual roles:
> > >
> > >
> > >    1. FileStore writer - current “source of truth".
> > >
> > >    2. LogStore writer - external or optionally derived[3] from
> FileStore
> > >
> > >    snapshots when changelog-producer != Input.
> > >
> > >    3. Write coordinator - is required for conflict resolution. This
> role
> > >
> > >    currently requires the external Catalog Lock (disabled by default,
> > >
> > >    configured by lock.enabled, the only available implementation is
> > >
> > >    HiveCatalogLock[4]) to guarantee Snapshot Isolation (SI); without
> this
> > >
> > >    lock, the FileStore writer can’t atomically (CAS) install the new
> > >
> > > snapshot
> > >
> > >    without the risk of overwriting concurrently committed one. The
> > >
> > > coordinator
> > >
> > >    is currently implemented and combined with the FileStore writer.
> > >
> > >
> > >
> > >
> > >
> > >
> https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/83a8b3299cefe6a5f0d099f0e0e074b73594bfbc/paimon-write-path.png
> > >
> > >
> > > I propose to change these roles such that:
> > >
> > >
> > >    - Write order in LogStore is “the source of truth". The default
> > >
> > >    implementation will write the change log and the set of keys to
> resolve
> > >
> > > SI
> > >
> > >    conflicts.
> > >
> > >    - Write coordinator is implemented in the LogStore instead of
> FileStore.
> > >
> > >    There are 2 implementations:
> > >
> > >    - "Internal changelog” (default) that guarantees Snapshot Isolation
> via
> > >
> > >       an optional Hive Catalog Lock.
> > >
> > >       - External logging systems that also acts as a coordinator and
> > >
> > >       provides guarantees specific to that system. For example, with
> > >
> > > Kafka as the
> > >
> > >       logging system, Kafka EXACTLY_ONCE transactions determine the
> > >
> > >       consistency semantics.
> > >
> > >    - In turn, FileStore writers read the change log and “materialize”
> it
> > >
> > >    into snapshots in the background. The FileStore writes can be done
> > >
> > > either
> > >
> > >    in a blocking way or as a standalone job, similar to compaction.
> > >
> > >    - In addition, we will need to adjust the read path. It will use the
> > >
> > >    latest “materialized” snapshots as a base and optionally apply a
> > >
> > >    non-yet-materialized changelog on top of it .
> > >
> > >
> > >
> > >
> > >
> > >
> https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/83a8b3299cefe6a5f0d099f0e0e074b73594bfbc/proposed-write-path.png
> > >
> > >
> > > With this change, we achieve several interesting properties, for
> example:
> > >
> > >
> > >    - Writes in LogStore are always consistent (as in, FileStore
> snapshots
> > >
> > >    become the actual snapshots of the LogStore at a certain offset)
> with
> > >
> > >    FileStore snapshots with the internal and external logging systems .
> > >
> > > This
> > >
> > >    property eliminates a whole class of anomalies caused by FileStore
> and
> > >
> > >    LogStore not agreeing on the write order and visibility.
> > >
> > >    - The actual consistency guarantees (consistency model of the
> > >
> > >    distributed system) offered by Apache Paimon depend on the LogStore
> > >
> > >    implementation. While the default LogStore implementation still
> offers
> > >
> > > SI,
> > >
> > >    the Kafka-based implementation will offer weaker consistency
> guarantees
> > >
> > >    that might be useful for cases with higher contention.
> > >
> > >    - We can use Kafka as the external log system and the coordinator to
> > >
> > >    enable non-Flink/Spark consumers that read directly from Kafka.
> > >
> > >    - Even though I talked about Kafka here, it will be easier to plug
> in
> > >
> > >    other external logging systems, allowing other logging sinks and
> > >
> > >    consistency models.
> > >
> > >
> > >
> > > Please let me know what you think about it. I am aware that this
> sketch of
> > >
> > > a proposal implies quite a redesign of the write and read paths, but,
> in my
> > >
> > > opinion, there is no easier way to solve consistency issues between
> > >
> > > FileStore and external LogStores and make Apache Paimon’s consistency
> model
> > >
> > > extensible.
> > >
> > >
> > >
> > >    1.
> > >
> > > https://paimon.apache.org/docs/master/concepts/primary-key-table/#none
> > >
> > >    2.
> > >
> > >
> > >
> > >
> https://github.com/apache/incubator-paimon/blob/4a56565ea9bd30efabefd09734432d34c48aff22/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java#L506C15-L514
> > >
> > >    3.
> > >
> > >
> > >
> > >
> https://paimon.apache.org/docs/master/concepts/primary-key-table/#changelog-producers
> > >
> > >    4.
> > >
> > >
> > >
> > >
> https://github.com/apache/incubator-paimon/blob/4a56565ea9bd30efabefd09734432d34c48aff22/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java#L44
> > >
> > >
> > >
> > > Best,
> > >
> > > Alex
> > >
> > >
> > > On Mar 31, 2023 at 04:49:15, Shammon FY <[email protected]> wrote:
> > >
> > >
> > > > Hi Alexander and Jingsong
> > >
> > > >
> > >
> > > > Sorry for joining the discussion late. I think @Alexander raise an
> > >
> > > > important question: the data consistency between log system and file
> > >
> > > store
> > >
> > > > in Paimon. In fact it also confuses me.
> > >
> > > >
> > >
> > > > In addition to multiple jobs writing to the same table, even when a
> job
> > >
> > > > writes to a table, there will be different data in log system and
> file
> > >
> > > > store after the job failover.
> > >
> > > >
> > >
> > > > As @Jingsong mentioned above, `external log system as the binlog
> storage
> > >
> > > of
> > >
> > > > the store`, but the file store won't recover from log system. It
> recovers
> > >
> > > > from `checkpoint` in upstream streaming process system which will
> cause
> > >
> > > the
> > >
> > > > data gap between log system and file store.
> > >
> > > >
> > >
> > > > So my question is: do we really need a log system? If so, is it
> necessary
> > >
> > > > to ensure data consistency between log system and file store? For
> > >
> > > example,
> > >
> > > > rollback the uncommit data of log system according to the file store?
> > >
> > > >
> > >
> > > > If users don't care the `wrong data` in log system, should Paimon
> support
> > >
> > > > users reading 'uncommitted' data in the future to support millisecond
> > >
> > > level
> > >
> > > > delays without log system?
> > >
> > > >
> > >
> > > > Hope to hear from you, thanks
> > >
> > > >
> > >
> > > > Best,
> > >
> > > > Shammon FY
> > >
> > > >
> > >
> > > >
> > >
> > > > On Tue, Mar 28, 2023 at 3:27 PM Jingsong Li <[email protected]>
> > >
> > > > wrote:
> > >
> > > >
> > >
> > > > Hi Alexander,
> > >
> > > >
> > >
> > > >
> > >
> > > > Thanks for starting this discussion.
> > >
> > > >
> > >
> > > >
> > >
> > > > Your analysis is right, currently kafka integration is having such
> > >
> > > >
> > >
> > > > inconsistency issues.
> > >
> > > >
> > >
> > > > You can share your thoughts on this.
> > >
> > > >
> > >
> > > >
> > >
> > > > I'll share my thoughts here:
> > >
> > > >
> > >
> > > >
> > >
> > > > The external log system makes a lot of sense, but currently there are
> > >
> > > >
> > >
> > > > some usability issues.
> > >
> > > >
> > >
> > > >
> > >
> > > > The kafka transaction confuses me. If the kafka transaction is turned
> > >
> > > >
> > >
> > > > on, then why not just read the incremental data from the FileStore,
> > >
> > > >
> > >
> > > > there is not much difference in their latency.
> > >
> > > >
> > >
> > > >
> > >
> > > > So my idea is external log system as the binlog storage of the store,
> > >
> > > >
> > >
> > > > its data should be written into by the store, and it can make the
> data
> > >
> > > >
> > >
> > > > visible in advance to reduce the latency, so that paimon really
> > >
> > > >
> > >
> > > > provides millisecond-level l-stream read latency.
> > >
> > > >
> > >
> > > > There is still a lot of design and advancement missing here.
> > >
> > > >
> > >
> > > >
> > >
> > > > Best,
> > >
> > > >
> > >
> > > > Jingsong
> > >
> > > >
> > >
> > > >
> > >
> > > > On Tue, Mar 28, 2023 at 10:44 AM Alexander Sorokoumov
> > >
> > > >
> > >
> > > > <[email protected]> wrote:
> > >
> > > >
> > >
> > > > >
> > >
> > > >
> > >
> > > > > Hi Paimon community,
> > >
> > > >
> > >
> > > > >
> > >
> > > >
> > >
> > > > > Congratulations on starting the incubating project!
> > >
> > > >
> > >
> > > > >
> > >
> > > >
> > >
> > > > > I have a question about preserving the write order and visibility
> > >
> > > between
> > >
> > > >
> > >
> > > > > LogStore and FileStore when LogStore is configured as an external
> > >
> > > system,
> > >
> > > >
> > >
> > > > > e.g., Kafka. To my knowledge, Paimon does not coordinate writes to
> > >
> > > >
> > >
> > > > LogStore
> > >
> > > >
> > >
> > > > > and FileStore between multiple Flink jobs writing to the same
> table.
> > >
> > > The
> > >
> > > >
> > >
> > > > > only check I am aware of is Snapshot Isolation key range check for
> > >
> > > >
> > >
> > > > > concurrent snapshot commits.
> > >
> > > >
> > >
> > > > >
> > >
> > > >
> > >
> > > > > If the assumptions above are correct, then the record order in
> Kafka
> > >
> > > does
> > >
> > > >
> > >
> > > > > not really correspond to the order in FileStore’s snapshots. In
> > >
> > > addition,
> > >
> > > >
> > >
> > > > > Kafka topics have different visibility guarantees under
> EXACTLY_ONCE,
> > >
> > > >
> > >
> > > > > compared to FileStore. As a motivating example, consider the
> following
> > >
> > > >
> > >
> > > > > setup:
> > >
> > > >
> > >
> > > > >
> > >
> > > >
> > >
> > > > >    1. Paimon table configured with Kafka as an external log
> system. The
> > >
> > > >
> > >
> > > > >    change log topic t1 has 1 partition.
> > >
> > > >
> > >
> > > > >    2. There are 2 concurrent Flink jobs writing to the same table
> > >
> > > >
> > >
> > > > >    non-intersecting key ranges.  Job1 writes keys {x1-x18}, Job2
> writes
> > >
> > > >
> > >
> > > > keys
> > >
> > > >
> > >
> > > > >    {y1-y4}.
> > >
> > > >
> > >
> > > > >    3. Both jobs write with EXACTLY_ONCE delivery guarantee, so
> there
> > >
> > > are
> > >
> > > >
> > >
> > > > 2
> > >
> > > >
> > >
> > > > >    Kafka transactions - txn1 for Job1 and txn2 for Job2.
> > >
> > > >
> > >
> > > > >    4. The final records order in t1 is [x1, x2, x3, x4, y1, y2,
> x5, x6,
> > >
> > > >
> > >
> > > > x7,
> > >
> > > >
> > >
> > > > >    y3, y4, x8].
> > >
> > > >
> > >
> > > > >    5. Snapshot1 of Job1 is concurrent to Snapshot2 of Job2.
> Snapshot1
> > >
> > > >
> > >
> > > > will
> > >
> > > >
> > >
> > > > >    commit first, Snapshot2 will commit second.
> > >
> > > >
> > >
> > > > >    6. See illustration for clarifications -
> > >
> > > >
> > >
> > > > >
> > >
> > > >
> > >
> > > >
> > >
> > > >
> > >
> > >
> > >
> https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/f6b763c70e0ddbfaabab5806ceaf6cb8c13c1fcd/concurrent-writers.png
> > >
> > > >
> > >
> > > > >    .
> > >
> > > >
> > >
> > > > >
> > >
> > > >
> > >
> > > > >
> > >
> > > >
> > >
> > > > >  In this setup, the following anomalies are possible:
> > >
> > > >
> > >
> > > > >
> > >
> > > >
> > >
> > > > >    1. Consider a continuous streaming query submitted after
> Snapshot 1
> > >
> > > >
> > >
> > > > >    committed, before Snapshot 2 committed. Such a query  returns
> > >
> > > >
> > >
> > > > Snapshot 1
> > >
> > > >
> > >
> > > > >    and then streams newer records. The snapshot will contain
> records
> > >
> > > >
> > >
> > > > {x1-x8}
> > >
> > > >
> > >
> > > > >    and won’t contain {y1-y2}. The streaming results would tail
> records
> > >
> > > >
> > >
> > > > after
> > >
> > > >
> > >
> > > > >    the last covered offset, so after 12 . As a result, such query
> won’t
> > >
> > > >
> > >
> > > > ever
> > >
> > > >
> > >
> > > > >    return records from txn2.
> > >
> > > >
> > >
> > > > >    2. Consider a batch query submitted after Snapshot 1 is
> committed,
> > >
> > > >
> > >
> > > > >    before Snapshot 2 is committed. Such a query would return
> {x1-x8}.
> > >
> > > >
> > >
> > > > However,
> > >
> > > >
> > >
> > > > >    a following streaming query reading from offset 1 would not
> return
> > >
> > > any
> > >
> > > >
> > >
> > > > >    results until Snapshot 2 commits. This would happen because the
> > >
> > > >
> > >
> > > > streaming
> > >
> > > >
> > >
> > > > >    query would read from the Kafka topic under READ_COMMITTED. As
> txn1
> > >
> > > >
> > >
> > > > and
> > >
> > > >
> > >
> > > > >    txn2 overlap, txn1 records are not visible until txn2 commits.
> > >
> > > >
> > >
> > > > >
> > >
> > > >
> > >
> > > > >
> > >
> > > >
> > >
> > > > >
> > >
> > > >
> > >
> > > > > I think I have some ideas about this, but I'm worried the scope
> might
> > >
> > > be
> > >
> > > >
> > >
> > > > > too large, and I'm not sure how important external log systems are
> to
> > >
> > > the
> > >
> > > >
> > >
> > > > > Paimon roadmap.
> > >
> > > >
> > >
> > > > >
> > >
> > > >
> > >
> > > > > Best,
> > >
> > > >
> > >
> > > > > Alexander
> > >
> > > >
> > >
> > > >
> > >
> > > >
> > >
> > >
> > >
>

Reply via email to