Khurrum, thank your for your comment.

A few use cases for your reference.

- we partitioned the data by keys (for example, user id). An operation may
update keys at the same time. We'd like the updates should only be seen by
the readers by only once. There is no partial failure state in between. A
single stream level transaction can not help in this case.
- imaging two stream computing job, the first one is reading from a set of
distributedlog streams and writing the computation results to the other set
of distributedlog streams. The other set of distributedlog streams are the
input for the second job. We use another distributedlog stream for tracking
the read dlsns for the first set of distributedlog stream and we want to
make sure updating the offset stream and propagating the computation
results into the streams of second job can be updated atomically.

Let me know your opinions. Appreciate your help.

- Xi

On Sun, Sep 18, 2016 at 10:25 AM, Khurrum Nasim <khurrumnas...@gmail.com>
wrote:

> Xi,
>
> The "stream level" transaction makes more sense to me. It is an extension
> of 'atomic write' records over the size limitation. I can see the value of
> having such ability.
>
> But I am not convinced by the namespace level transaction. Do you have any
> concrente use cases that you can talk about more?
>
> - KN
>
> On Mon, Sep 12, 2016 at 5:20 PM, Xi Liu <xi.liu....@gmail.com> wrote:
>
> > Hello,
> >
> > I asked the transaction support in distributedlog user group two months
> > ago. I want to raise this up again, as we are looking for using
> > distributedlog for building a transactional data service. It is a major
> > feature that is missing in distributedlog. We have some ideas to add this
> > to distributedlog and want to know if they make sense or not. If they are
> > good, we'd like to contribute and develop with the community.
> >
> > Here are the thoughts:
> >
> > -------------------------------------------------
> >
> > From our understanding, DL can provide "at-least-once" delivery semantic
> > (if not, please correct me) but not "exactly-once" delivery semantic.
> That
> > means that a message can be delivered one or more times if the reader
> > doesn't handle duplicates.
> >
> > The duplicates come from two places, one is at writer side (this assumes
> > using write proxy not the core library), while the other one is at reader
> > side.
> >
> > - writer side: if the client attempts to write a record to the write
> > proxies and gets a network error (e.g timeouts) then retries, the
> retrying
> > will potentially result in duplicates.
> > - reader side:if the reader reads a message from a stream and then
> crashes,
> > when the reader restarts it would restart from last known position
> (DLSN).
> > If the reader fails after processing a record and before recording the
> > position, the processed record will be delivered again.
> >
> > The reader problem can be properly addressed by making use of the
> sequence
> > numbers of records and doing proper checkpointing. For example, in
> > database, it can checkpoint the indexed data with the sequence number of
> > records; in flink, it can checkpoint the state with the sequence numbers.
> >
> > The writer problem can be addressed by implementing an idempotent writer.
> > However, an alternative and more powerful approach is to support
> > transactions.
> >
> > *What does transaction mean?*
> >
> > A transaction means a collection of records can be written
> transactionally
> > within a stream or across multiple streams. They will be consumed by the
> > reader together when a transaction is committed, or will never be
> consumed
> > by the reader when the transaction is aborted.
> >
> > The transaction will expose following guarantees:
> >
> > - The reader should not be exposed to records written from uncommitted
> > transactions (mandatory)
> > - The reader should consume the records in the transaction commit order
> > rather than the record written order (mandatory)
> > - No duplicated records within a transaction (mandatory)
> > - Allow interleaving transactional writes and non-transactional writes
> > (optional)
> >
> > *Stream Transaction & Namespace Transaction*
> >
> > There will be two types of transaction, one is Stream level transaction
> > (local transaction), while the other one is Namespace level transaction
> > (global transaction).
> >
> > The stream level transaction is a transactional operation on writing
> > records to one stream; the namespace level transaction is a transactional
> > operation on writing records to multiple streams.
> >
> > *Implementation Thoughts*
> >
> > - A transaction is consist of begin control record, a series of data
> > records and commit/abort control record.
> > - The begin/commit/abort control record is written to a `commit` log
> > stream, while the data records will be written to normal data log
> streams.
> > - The `commit` log stream will be the same log stream for stream-level
> > transaction,  while it will be a *system* stream (or multiple system
> > streams) for namespace-level transactions.
> > - The transaction code looks like as below:
> >
> > <code>
> >
> > Transaction txn = client.transaction();
> > Future<DLSN> result1 = txn.write(stream-0, record);
> > Future<DLSN> result2 = txn.write(stream-1, record);
> > Future<DLSN> result3 = txn.write(stream-2, record);
> > Future<Pair<DLSN, DLSN>> result = txn.commit();
> >
> > </code>
> >
> > if the txn is committed, all the write futures will be satisfied with
> their
> > written DLSNs. if the txn is aborted, all the write futures will be
> failed
> > together. there is no partial failure state.
> >
> > - The actually data flow will be:
> >
> > 1. writer get a transaction id from the owner of the `commit' log stream
> > 1. write the begin control record (synchronously) with the transaction id
> > 2. for each write within the same txn, it will be assigned a local
> sequence
> > number starting from 0. the combination of transaction id and local
> > sequence number will be used later on by the readers to de-duplicate
> > records.
> > 3. the commit/abort control record will be written based on the results
> > from 2.
> >
> > - Application can supply a timeout for the transaction when #begin() a
> > transaction. The owner of the `commit` log stream can abort transactions
> > that never be committed/aborted within their timeout.
> >
> > - Failures:
> >
> > * all the log records can be simply retried as they will be de-duplicated
> > probably at the reader side.
> >
> > - Reader:
> >
> > * Reader can be configured to read uncommitted records or committed
> records
> > only (by default read uncommitted records)
> > * If reader is configured to read committed records only, the read ahead
> > cache will be changed to maintain one additional pending committed
> records.
> > the pending committed records map is bounded and records will be dropped
> > when read ahead is moving.
> > * when the reader hits a commit record, it will rewind to the begin
> record
> > and start reading from there. leveraging the proper read ahead cache and
> > pending commit records cache, it would be good for both short
> transactions
> > and long transactions.
> >
> > - DLSN, SequenceId:
> >
> > * We will add a fourth field to DLSN. It is `local sequence number`
> within
> > a transaction session. So the new DLSN of records in a transaction will
> be
> > the DLSN of commit control record plus its local sequence number.
> > * The sequence id will be still the position of the commit record plus
> its
> > local sequence number. The position will be advanced with total number of
> > written records on writing the commit control record.
> >
> > - Transaction Group & Namespace Transaction
> >
> > using one single log stream for namespace transaction can cause the
> > bottleneck problem since all the begin/commit/end control records will
> have
> > to go through one log stream.
> >
> > the idea of 'transaction group' is to allow partitioning the writers into
> > different transaction groups.
> >
> > clients can specify the `group-name` when starting the transaction. if
> > there is no `group-name` specified, it will use the default `commit` log
> in
> > the namespace for creating transactions.
> >
> > -------------------------------------------------
> >
> > I'd like to collect feedbacks on this idea. Appreciate any comments and
> if
> > anyone is also interested in this idea, we'd like to collaborate with the
> > community.
> >
> >
> > - Xi
> >
>

Reply via email to