I agree we many want to review pravega's past efforts in this area also.

https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java

-Ali

On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <guosi...@gmail.com> wrote:

> Kafka's implementation is interleaving committed messages with uncommitted
> messages at storage. Personally I think it is a very ugly design and
> implementation.
>
> Pulsar is a segment centric system, where we have a shared segment storage
> - bookkeeper. I think a better direction is to leverage the segments (aka
> ledgers)
> for buffering uncommitted messages and commit the whole segment when the
> whole transaction is committed.
>
> A rough idea would be:
>
> 1) for any transaction, write the messages to a separate ledger (or
> multiple separate ledger).
> 2) during the transaction, accumulates the messages in those ledgers.
> 3) when commit, merge the txn ledgers back to the main data ledger. the
> merge can be done either adding a meta message where data is stored in the
> txn ledger or actually copying the data to data ledger (depending on the
> size of data accumulate in the transaction).
> 4) when abort, delete the txn ledger. No other additional work to be done.
>
> This would be producing a much clear design than Kafka.
>
> On Ivan's comments:
>
> > Transactional acknowledgement also needs to be taken into account
>
> I don't think we have to treat `transactional acknowledgement` as a special
> case. currently `acknowledgment` are actually "append" operations into
> cursor ledgers.
> So the problem set can be reduced as `atomic append` to both data ledgers
> and cursor ledgers. in that way, we can use one solution for handling
> appending data and updating cursors.
>
> Additionally, I think a related topic about transactions would be
> supporting large sized message (e.g. >= 5MB). If we take the approach I
> described above using a separated ledger for accumulating messages for a
> transaction, that we are easy to model a large size message as a
> transaction of chunked messages.
>
> @Richard, @Ivan let me know what do you think. If you guys think the
> direction I raised is a good one to go down, I am happy to write them down
> into details, and drive the design and coordinate the implementations in
> the community.
>
> - Sijie
>
> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <yohan.richard...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > We might be able to get some ideas on implementing this from Kafka:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> >
> > Obviously, there is some differences in Kafka and Pulsar internals but at
> > some level, the implementation would be similar.
> > It should help.
> >
> >
> >
> > On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <yohan.richard...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > Per request, I've created a doc so we could get some more input in an
> > > organized manner:
> > >
> > >
> >
> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
> > >
> > > And for Ivan's questions, I would answer accordingly.
> > >
> > > >By "set the message to unknown", do you mean the broker will cache the
> > > >message, not writing it to any log?
> > >
> > > We wouldn't cache the message from my interpretation of the steps. What
> > > the producer is first sending is a pre-processing message, not the real
> > > message itself. This step basically notifies the broker that the
> message
> > is
> > > on its way. So all we have to do is store the message id and its
> > > corresponding status in a map, and depending on the producer's
> response,
> > > the status will change accordingly.
> > >
> > > > In designs we've discussed previously, this was handled
> > > > by a component called the transaction coordinator, which is a logical
> > > > component which each broker knows how to talk to. For a transaction
> > > > the commit message is sent to the coordinator, which writes it to its
> > > > own log, and then goes through each topic in the commit and marks the
> > > > transaction as completed.
> > >
> > > I wasn't aware of previous discussions on this topic, but it seems
> pretty
> > > good to me. It's certainly better than what I would come up with.
> > > If there's any more things we need to talk about, I suppose we could
> move
> > > it to the google doc to play around with.
> > >
> > > Hope we can get this PIP rolling.
> > >
> > >
> > > On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <guosi...@gmail.com> wrote:
> > >
> > >> Richard,
> > >>
> > >> Thank you for putting this put and pushing the discussion forward.
> > >>
> > >> I think this is a very large feature. It might be worth creating a
> > google
> > >> doc for it (which is better for collaboration). And I believe Ivan has
> > >> some
> > >> thoughts as well. If you can put up a google doc (make it
> > world-editable),
> > >> Ivan can probably dump his thoughts there and we can finalize the
> > >> discussion and break down into tasks. So the whole community can
> > actually
> > >> work together at collaborating this.
> > >>
> > >> Thanks,
> > >> Sijie
> > >>
> > >> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
> yohan.richard...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi all,
> > >> >
> > >> > I would like to create a PIP for issue #2664 on Github. The details
> of
> > >> the
> > >> > PIP are below.
> > >> > I hope we could discuss this thoroughly.
> > >> >
> > >> > Cheers,
> > >> > Richard
> > >> >
> > >> > PIP-31: Add support for transactional messaging
> > >> >
> > >> > Motivation: Pulsar currently could improve upon their system of
> > sending
> > >> > packets of data by implementing transactional messaging. This system
> > >> > enforces eventual consistency within the system, and allows
> operations
> > >> to
> > >> > be performed atomically.
> > >> >
> > >> > Proposal:
> > >> >
> > >> > As described in the issue, we would implement the following policy
> in
> > >> > Producer and Pulsar Broker:
> > >> > 1. The producer produces the pre-processing transaction message. At
> > this
> > >> > point, the broker will set the status of this message to unknown.
> > >> > 2. After the local transaction is successfully executed, the commit
> > >> message
> > >> > is sent, otherwise the rollback message is sent.
> > >> > 3. The broker receives the message. If it is a commit message, it
> > >> modifies
> > >> > the transaction status to commit, and then sends an actual message
> to
> > >> the
> > >> > consumer queue. At this time, the consumer can consume the message.
> > >> > Otherwise, the transaction status is modified to rollback. The
> message
> > >> will
> > >> > be discarded.
> > >> > 4. If at step 2, the producer is down or abnormal, at this time, the
> > >> broker
> > >> > will periodically ask the specific producer for the status of the
> > >> message,
> > >> > and update the status according to the producer's response, and
> > process
> > >> it
> > >> > according to step 3, the action that comes down.
> > >> >
> > >> > Specific concerns:
> > >> > There are a number of things we will improve upon or add:
> > >> > - A configuration called ```maxMessageUnknownTime```. Consider this
> > >> > scenario: the pre-processing transaction message is sent, but the
> > >> commit or
> > >> > rollback message is never received, which could mean that the status
> > of
> > >> a
> > >> > message would be permanently unknown. To avoid this from happening,
> we
> > >> > would need a config which limits the amount of time the status of a
> > >> message
> > >> > could be unknown (i.e. ```maxMessageUnknownTime```) After that, the
> > >> message
> > >> > would be discarded.
> > >> > - Logging would be updated to log the status of a message i.e.
> > UNKNOWN,
> > >> > ROLLBACK, or COMMITTED. This would allow the user to know whether or
> > >> not a
> > >> > message had failed or fallen through.
> > >> >
> > >> > Possible Additional API:
> > >> > - We would add a method which allows the user to query the state of
> > the
> > >> > message i.e. ```getStateOfMessage(long id)```
> > >> >
> > >>
> > >
> >
>


-- 
-Ali

Reply via email to