This sounds interesting ... I will take a closer look and give my comments
later.

At the same time, do you mind creating a wiki page to put your idea there?
You can add your wiki page under
https://cwiki.apache.org/confluence/display/DL/Project+Proposals

You might need to ask in the dev list to grant the wiki edit permissions to
you once you have a wiki account.

- Sijie


On Mon, Sep 12, 2016 at 2:20 AM, Xi Liu <xi.liu....@gmail.com> wrote:

> Hello,
>
> I asked the transaction support in distributedlog user group two months
> ago. I want to raise this up again, as we are looking for using
> distributedlog for building a transactional data service. It is a major
> feature that is missing in distributedlog. We have some ideas to add this
> to distributedlog and want to know if they make sense or not. If they are
> good, we'd like to contribute and develop with the community.
>
> Here are the thoughts:
>
> -------------------------------------------------
>
> From our understanding, DL can provide "at-least-once" delivery semantic
> (if not, please correct me) but not "exactly-once" delivery semantic. That
> means that a message can be delivered one or more times if the reader
> doesn't handle duplicates.
>
> The duplicates come from two places, one is at writer side (this assumes
> using write proxy not the core library), while the other one is at reader
> side.
>
> - writer side: if the client attempts to write a record to the write
> proxies and gets a network error (e.g timeouts) then retries, the retrying
> will potentially result in duplicates.
> - reader side:if the reader reads a message from a stream and then crashes,
> when the reader restarts it would restart from last known position (DLSN).
> If the reader fails after processing a record and before recording the
> position, the processed record will be delivered again.
>
> The reader problem can be properly addressed by making use of the sequence
> numbers of records and doing proper checkpointing. For example, in
> database, it can checkpoint the indexed data with the sequence number of
> records; in flink, it can checkpoint the state with the sequence numbers.
>
> The writer problem can be addressed by implementing an idempotent writer.
> However, an alternative and more powerful approach is to support
> transactions.
>
> *What does transaction mean?*
>
> A transaction means a collection of records can be written transactionally
> within a stream or across multiple streams. They will be consumed by the
> reader together when a transaction is committed, or will never be consumed
> by the reader when the transaction is aborted.
>
> The transaction will expose following guarantees:
>
> - The reader should not be exposed to records written from uncommitted
> transactions (mandatory)
> - The reader should consume the records in the transaction commit order
> rather than the record written order (mandatory)
> - No duplicated records within a transaction (mandatory)
> - Allow interleaving transactional writes and non-transactional writes
> (optional)
>
> *Stream Transaction & Namespace Transaction*
>
> There will be two types of transaction, one is Stream level transaction
> (local transaction), while the other one is Namespace level transaction
> (global transaction).
>
> The stream level transaction is a transactional operation on writing
> records to one stream; the namespace level transaction is a transactional
> operation on writing records to multiple streams.
>
> *Implementation Thoughts*
>
> - A transaction is consist of begin control record, a series of data
> records and commit/abort control record.
> - The begin/commit/abort control record is written to a `commit` log
> stream, while the data records will be written to normal data log streams.
> - The `commit` log stream will be the same log stream for stream-level
> transaction,  while it will be a *system* stream (or multiple system
> streams) for namespace-level transactions.
> - The transaction code looks like as below:
>
> <code>
>
> Transaction txn = client.transaction();
> Future<DLSN> result1 = txn.write(stream-0, record);
> Future<DLSN> result2 = txn.write(stream-1, record);
> Future<DLSN> result3 = txn.write(stream-2, record);
> Future<Pair<DLSN, DLSN>> result = txn.commit();
>
> </code>
>
> if the txn is committed, all the write futures will be satisfied with their
> written DLSNs. if the txn is aborted, all the write futures will be failed
> together. there is no partial failure state.
>
> - The actually data flow will be:
>
> 1. writer get a transaction id from the owner of the `commit' log stream
> 1. write the begin control record (synchronously) with the transaction id
> 2. for each write within the same txn, it will be assigned a local sequence
> number starting from 0. the combination of transaction id and local
> sequence number will be used later on by the readers to de-duplicate
> records.
> 3. the commit/abort control record will be written based on the results
> from 2.
>
> - Application can supply a timeout for the transaction when #begin() a
> transaction. The owner of the `commit` log stream can abort transactions
> that never be committed/aborted within their timeout.
>
> - Failures:
>
> * all the log records can be simply retried as they will be de-duplicated
> probably at the reader side.
>
> - Reader:
>
> * Reader can be configured to read uncommitted records or committed records
> only (by default read uncommitted records)
> * If reader is configured to read committed records only, the read ahead
> cache will be changed to maintain one additional pending committed records.
> the pending committed records map is bounded and records will be dropped
> when read ahead is moving.
> * when the reader hits a commit record, it will rewind to the begin record
> and start reading from there. leveraging the proper read ahead cache and
> pending commit records cache, it would be good for both short transactions
> and long transactions.
>
> - DLSN, SequenceId:
>
> * We will add a fourth field to DLSN. It is `local sequence number` within
> a transaction session. So the new DLSN of records in a transaction will be
> the DLSN of commit control record plus its local sequence number.
> * The sequence id will be still the position of the commit record plus its
> local sequence number. The position will be advanced with total number of
> written records on writing the commit control record.
>
> - Transaction Group & Namespace Transaction
>
> using one single log stream for namespace transaction can cause the
> bottleneck problem since all the begin/commit/end control records will have
> to go through one log stream.
>
> the idea of 'transaction group' is to allow partitioning the writers into
> different transaction groups.
>
> clients can specify the `group-name` when starting the transaction. if
> there is no `group-name` specified, it will use the default `commit` log in
> the namespace for creating transactions.
>
> -------------------------------------------------
>
> I'd like to collect feedbacks on this idea. Appreciate any comments and if
> anyone is also interested in this idea, we'd like to collaborate with the
> community.
>
>
> - Xi
>

Reply via email to