Hi Jingsong, Shammon,

Please find below my thoughts about external log systems, their value, and
a sketch of a proposal on what should the systematic fix look like. I will
happily write a more detailed proposal (PIP?) if the community finds this
sketch attractive.

This answer turned out to be rather long, so to sum up my points, I think
that:

   1. External log systems enable a variety of valuable use cases, and
   2. The way to correctly enable them is to redesign the write path such
   that the LogStore becomes the source of truth and the coordinator on the
   write path.


*External log systems enable a variety of valuable use cases.*

Here are reasons why I think external log systems are valuable:

   1. With an explicitly stored log, we don’t need the normalization node
   if the downstream consumer does not need retractions or the stream already
   contains them[1]. Having this option enables more flexibility around
   performance and functionality.
   2. External log systems enable external consumers that can read the
   change log without Flink/Spark jobs. Integrations with external sinks are
   advantageous for Paimon as a streaming data lake.
   3. Finally, this is an opportunity for Paimon to support different
   isolation/consistency models via pluggable external log systems or
   coordinators. The rest of this message elaborates on this point.


*The way to correctly enable them is to redesign the write path such that
the LogStore becomes the source of truth and the coordinator on the write
path.*

First, let’s talk about the root cause of the problem at hand. Paimon’s
dual write path coordinates concurrent writes in the FileStore Commit
phase. FileStoreCommit creates a totally ordered sequence (by monotonically
increasing snapshot IDs) out of concurrently running non-conflicting
snapshots by[2]:

   1. Observing the concurrently committed snapshot, then
   2. Verifying that the current job can proceed because there is no
   conflict.


This way, FileStoreCommit orders concurrent snapshots one after another if
it does not find conflicting key ranges or aborts conflicting snapshots it
does. As all changes that belong to a single snapshot are grouped together
in, well, that snapshot, FileStoreCommit effectively reorders the write
sequence compared to the “original” write order that happened in the write
phase. In the illustration below LogStore has records in the original order
and it does not agree with the order in the FileStore:

https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/f6b763c70e0ddbfaabab5806ceaf6cb8c13c1fcd/concurrent-writers.png

In the log order, writes from txn1 and txn2 interleave. In the FileStore
order, all writes from txn1 (grey) happened before all writes from txn2
(red).

This is generally too late for the external LogStore to change the write
order as it has been appending records in the write phase; it can only
abort or commit. *As a result, FileStore snapshots can become inconsistent
with writes in the LogStore.* In other words, a reader that consumes the
LogStore up to the topic offset specified in the FileStore snapshot might
not be able to reproduce that snapshot.

To make my argument more specific, for the next couple of paragraphs, I
will assume that the external logging system is Kafka, configured with
EXACTLY_ONCE.

If we look at Apache Paimon’s write path as it is right now, there are 3
conceptual roles:

   1. FileStore writer - current “source of truth".
   2. LogStore writer - external or optionally derived[3] from FileStore
   snapshots when changelog-producer != Input.
   3. Write coordinator - is required for conflict resolution. This role
   currently requires the external Catalog Lock (disabled by default,
   configured by lock.enabled, the only available implementation is
   HiveCatalogLock[4]) to guarantee Snapshot Isolation (SI); without this
   lock, the FileStore writer can’t atomically (CAS) install the new snapshot
   without the risk of overwriting concurrently committed one. The coordinator
   is currently implemented and combined with the FileStore writer.


https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/83a8b3299cefe6a5f0d099f0e0e074b73594bfbc/paimon-write-path.png

I propose to change these roles such that:

   - Write order in LogStore is “the source of truth". The default
   implementation will write the change log and the set of keys to resolve SI
   conflicts.
   - Write coordinator is implemented in the LogStore instead of FileStore.
   There are 2 implementations:
   - "Internal changelog” (default) that guarantees Snapshot Isolation via
      an optional Hive Catalog Lock.
      - External logging systems that also acts as a coordinator and
      provides guarantees specific to that system. For example, with
Kafka as the
      logging system, Kafka EXACTLY_ONCE transactions determine the
      consistency semantics.
   - In turn, FileStore writers read the change log and “materialize” it
   into snapshots in the background. The FileStore writes can be done either
   in a blocking way or as a standalone job, similar to compaction.
   - In addition, we will need to adjust the read path. It will use the
   latest “materialized” snapshots as a base and optionally apply a
   non-yet-materialized changelog on top of it .


https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/83a8b3299cefe6a5f0d099f0e0e074b73594bfbc/proposed-write-path.png

With this change, we achieve several interesting properties, for example:

   - Writes in LogStore are always consistent (as in, FileStore snapshots
   become the actual snapshots of the LogStore at a certain offset) with
   FileStore snapshots with the internal and external logging systems . This
   property eliminates a whole class of anomalies caused by FileStore and
   LogStore not agreeing on the write order and visibility.
   - The actual consistency guarantees (consistency model of the
   distributed system) offered by Apache Paimon depend on the LogStore
   implementation. While the default LogStore implementation still offers SI,
   the Kafka-based implementation will offer weaker consistency guarantees
   that might be useful for cases with higher contention.
   - We can use Kafka as the external log system and the coordinator to
   enable non-Flink/Spark consumers that read directly from Kafka.
   - Even though I talked about Kafka here, it will be easier to plug in
   other external logging systems, allowing other logging sinks and
   consistency models.


Please let me know what you think about it. I am aware that this sketch of
a proposal implies quite a redesign of the write and read paths, but, in my
opinion, there is no easier way to solve consistency issues between
FileStore and external LogStores and make Apache Paimon’s consistency model
extensible.


   1. https://paimon.apache.org/docs/master/concepts/primary-key-table/#none
   2.
   
https://github.com/apache/incubator-paimon/blob/4a56565ea9bd30efabefd09734432d34c48aff22/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java#L506C15-L514
   3.
   
https://paimon.apache.org/docs/master/concepts/primary-key-table/#changelog-producers
   4.
   
https://github.com/apache/incubator-paimon/blob/4a56565ea9bd30efabefd09734432d34c48aff22/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java#L44


Best,
Alex

On Mar 31, 2023 at 04:49:15, Shammon FY <[email protected]> wrote:

> Hi Alexander and Jingsong
>
> Sorry for joining the discussion late. I think @Alexander raise an
> important question: the data consistency between log system and file store
> in Paimon. In fact it also confuses me.
>
> In addition to multiple jobs writing to the same table, even when a job
> writes to a table, there will be different data in log system and file
> store after the job failover.
>
> As @Jingsong mentioned above, `external log system as the binlog storage of
> the store`, but the file store won't recover from log system. It recovers
> from `checkpoint` in upstream streaming process system which will cause the
> data gap between log system and file store.
>
> So my question is: do we really need a log system? If so, is it necessary
> to ensure data consistency between log system and file store? For example,
> rollback the uncommit data of log system according to the file store?
>
> If users don't care the `wrong data` in log system, should Paimon support
> users reading 'uncommitted' data in the future to support millisecond level
> delays without log system?
>
> Hope to hear from you, thanks
>
> Best,
> Shammon FY
>
>
> On Tue, Mar 28, 2023 at 3:27 PM Jingsong Li <[email protected]>
> wrote:
>
> Hi Alexander,
>
>
> Thanks for starting this discussion.
>
>
> Your analysis is right, currently kafka integration is having such
>
> inconsistency issues.
>
> You can share your thoughts on this.
>
>
> I'll share my thoughts here:
>
>
> The external log system makes a lot of sense, but currently there are
>
> some usability issues.
>
>
> The kafka transaction confuses me. If the kafka transaction is turned
>
> on, then why not just read the incremental data from the FileStore,
>
> there is not much difference in their latency.
>
>
> So my idea is external log system as the binlog storage of the store,
>
> its data should be written into by the store, and it can make the data
>
> visible in advance to reduce the latency, so that paimon really
>
> provides millisecond-level l-stream read latency.
>
> There is still a lot of design and advancement missing here.
>
>
> Best,
>
> Jingsong
>
>
> On Tue, Mar 28, 2023 at 10:44 AM Alexander Sorokoumov
>
> <[email protected]> wrote:
>
> >
>
> > Hi Paimon community,
>
> >
>
> > Congratulations on starting the incubating project!
>
> >
>
> > I have a question about preserving the write order and visibility between
>
> > LogStore and FileStore when LogStore is configured as an external system,
>
> > e.g., Kafka. To my knowledge, Paimon does not coordinate writes to
>
> LogStore
>
> > and FileStore between multiple Flink jobs writing to the same table. The
>
> > only check I am aware of is Snapshot Isolation key range check for
>
> > concurrent snapshot commits.
>
> >
>
> > If the assumptions above are correct, then the record order in Kafka does
>
> > not really correspond to the order in FileStore’s snapshots. In addition,
>
> > Kafka topics have different visibility guarantees under EXACTLY_ONCE,
>
> > compared to FileStore. As a motivating example, consider the following
>
> > setup:
>
> >
>
> >    1. Paimon table configured with Kafka as an external log system. The
>
> >    change log topic t1 has 1 partition.
>
> >    2. There are 2 concurrent Flink jobs writing to the same table
>
> >    non-intersecting key ranges.  Job1 writes keys {x1-x18}, Job2 writes
>
> keys
>
> >    {y1-y4}.
>
> >    3. Both jobs write with EXACTLY_ONCE delivery guarantee, so there are
>
> 2
>
> >    Kafka transactions - txn1 for Job1 and txn2 for Job2.
>
> >    4. The final records order in t1 is [x1, x2, x3, x4, y1, y2, x5, x6,
>
> x7,
>
> >    y3, y4, x8].
>
> >    5. Snapshot1 of Job1 is concurrent to Snapshot2 of Job2. Snapshot1
>
> will
>
> >    commit first, Snapshot2 will commit second.
>
> >    6. See illustration for clarifications -
>
> >
>
>
> https://gist.githubusercontent.com/Gerrrr/fffcbc43431595fd4931869c9ba51e8b/raw/f6b763c70e0ddbfaabab5806ceaf6cb8c13c1fcd/concurrent-writers.png
>
> >    .
>
> >
>
> >
>
> >  In this setup, the following anomalies are possible:
>
> >
>
> >    1. Consider a continuous streaming query submitted after Snapshot 1
>
> >    committed, before Snapshot 2 committed. Such a query  returns
>
> Snapshot 1
>
> >    and then streams newer records. The snapshot will contain records
>
> {x1-x8}
>
> >    and won’t contain {y1-y2}. The streaming results would tail records
>
> after
>
> >    the last covered offset, so after 12 . As a result, such query won’t
>
> ever
>
> >    return records from txn2.
>
> >    2. Consider a batch query submitted after Snapshot 1 is committed,
>
> >    before Snapshot 2 is committed. Such a query would return {x1-x8}.
>
> However,
>
> >    a following streaming query reading from offset 1 would not return any
>
> >    results until Snapshot 2 commits. This would happen because the
>
> streaming
>
> >    query would read from the Kafka topic under READ_COMMITTED. As txn1
>
> and
>
> >    txn2 overlap, txn1 records are not visible until txn2 commits.
>
> >
>
> >
>
> >
>
> > I think I have some ideas about this, but I'm worried the scope might be
>
> > too large, and I'm not sure how important external log systems are to the
>
> > Paimon roadmap.
>
> >
>
> > Best,
>
> > Alexander
>
>
>

Reply via email to