Once there's support for transactions in messaging API, there will be
no need for a base class for functions. Rather a config option will
allow to enable transactional mode.
--
Matteo Merli
<matteo.me...@gmail.com>

On Sat, Mar 2, 2019 at 6:39 PM Sijie Guo <guosi...@gmail.com> wrote:
>
> Dave,
>
> You mean implementing the transactions in pulsar function?
>
> - Sijie
>
> On Sun, Mar 3, 2019 at 1:52 AM Dave Fisher <dave2w...@comcast.net> wrote:
>
> > Hi -
> >
> > Is this a case where a Pulsar function base class for transactions would
> > help?
> >
> > Regards,
> > Dave
> >
> > Sent from my iPhone
> >
> > > On Mar 2, 2019, at 2:39 AM, Sijie Guo <guosi...@gmail.com> wrote:
> > >
> > > Pravega's model is a better model than Kafka - it addressed the
> > > interleaving problems. However Pravega's model is based on a giant
> > > replicated log and rewrite the data to a second tiered storage for
> > > persistence, which basically re-implemented bookkeeper's logic in
> > broker. A
> > > fundamental drawback of Pravega is write amplifications. The
> > amplifications
> > > of both network and IO bandwidth are huge. If you use bookkeeper both for
> > > its first-and-second tier storage and assume the bookkeeper replication
> > > factor is 3, pravega requires 6x network bandwidth and 12x IO bandwidth.
> > > For a given message, it needs to write 3 times into the journal, and 3
> > > times for persistent. The amplifications hugely limit the throughput at
> > > pravega "brokers".
> > >
> > > - Sijie
> > >
> > >
> > >
> > >> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ahmal...@gmail.com> wrote:
> > >>
> > >> I agree we many want to review pravega's past efforts in this area also.
> > >>
> > >>
> > >>
> > https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
> > >>
> > >>
> > https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java
> > >>
> > >> -Ali
> > >>
> > >>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <guosi...@gmail.com> wrote:
> > >>>
> > >>> Kafka's implementation is interleaving committed messages with
> > >> uncommitted
> > >>> messages at storage. Personally I think it is a very ugly design and
> > >>> implementation.
> > >>>
> > >>> Pulsar is a segment centric system, where we have a shared segment
> > >> storage
> > >>> - bookkeeper. I think a better direction is to leverage the segments
> > (aka
> > >>> ledgers)
> > >>> for buffering uncommitted messages and commit the whole segment when
> > the
> > >>> whole transaction is committed.
> > >>>
> > >>> A rough idea would be:
> > >>>
> > >>> 1) for any transaction, write the messages to a separate ledger (or
> > >>> multiple separate ledger).
> > >>> 2) during the transaction, accumulates the messages in those ledgers.
> > >>> 3) when commit, merge the txn ledgers back to the main data ledger. the
> > >>> merge can be done either adding a meta message where data is stored in
> > >> the
> > >>> txn ledger or actually copying the data to data ledger (depending on
> > the
> > >>> size of data accumulate in the transaction).
> > >>> 4) when abort, delete the txn ledger. No other additional work to be
> > >> done.
> > >>>
> > >>> This would be producing a much clear design than Kafka.
> > >>>
> > >>> On Ivan's comments:
> > >>>
> > >>>> Transactional acknowledgement also needs to be taken into account
> > >>>
> > >>> I don't think we have to treat `transactional acknowledgement` as a
> > >> special
> > >>> case. currently `acknowledgment` are actually "append" operations into
> > >>> cursor ledgers.
> > >>> So the problem set can be reduced as `atomic append` to both data
> > ledgers
> > >>> and cursor ledgers. in that way, we can use one solution for handling
> > >>> appending data and updating cursors.
> > >>>
> > >>> Additionally, I think a related topic about transactions would be
> > >>> supporting large sized message (e.g. >= 5MB). If we take the approach I
> > >>> described above using a separated ledger for accumulating messages for
> > a
> > >>> transaction, that we are easy to model a large size message as a
> > >>> transaction of chunked messages.
> > >>>
> > >>> @Richard, @Ivan let me know what do you think. If you guys think the
> > >>> direction I raised is a good one to go down, I am happy to write them
> > >> down
> > >>> into details, and drive the design and coordinate the implementations
> > in
> > >>> the community.
> > >>>
> > >>> - Sijie
> > >>>
> > >>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <yohan.richard...@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Hi all,
> > >>>>
> > >>>> We might be able to get some ideas on implementing this from Kafka:
> > >>>>
> > >>>>
> > >>>
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> > >>>>
> > >>>> Obviously, there is some differences in Kafka and Pulsar internals but
> > >> at
> > >>>> some level, the implementation would be similar.
> > >>>> It should help.
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <
> > yohan.richard...@gmail.com
> > >>>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi,
> > >>>>>
> > >>>>> Per request, I've created a doc so we could get some more input in an
> > >>>>> organized manner:
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
> > >>>>>
> > >>>>> And for Ivan's questions, I would answer accordingly.
> > >>>>>
> > >>>>>> By "set the message to unknown", do you mean the broker will cache
> > >> the
> > >>>>>> message, not writing it to any log?
> > >>>>>
> > >>>>> We wouldn't cache the message from my interpretation of the steps.
> > >> What
> > >>>>> the producer is first sending is a pre-processing message, not the
> > >> real
> > >>>>> message itself. This step basically notifies the broker that the
> > >>> message
> > >>>> is
> > >>>>> on its way. So all we have to do is store the message id and its
> > >>>>> corresponding status in a map, and depending on the producer's
> > >>> response,
> > >>>>> the status will change accordingly.
> > >>>>>
> > >>>>>> In designs we've discussed previously, this was handled
> > >>>>>> by a component called the transaction coordinator, which is a
> > >> logical
> > >>>>>> component which each broker knows how to talk to. For a transaction
> > >>>>>> the commit message is sent to the coordinator, which writes it to
> > >> its
> > >>>>>> own log, and then goes through each topic in the commit and marks
> > >> the
> > >>>>>> transaction as completed.
> > >>>>>
> > >>>>> I wasn't aware of previous discussions on this topic, but it seems
> > >>> pretty
> > >>>>> good to me. It's certainly better than what I would come up with.
> > >>>>> If there's any more things we need to talk about, I suppose we could
> > >>> move
> > >>>>> it to the google doc to play around with.
> > >>>>>
> > >>>>> Hope we can get this PIP rolling.
> > >>>>>
> > >>>>>
> > >>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <guosi...@gmail.com>
> > >> wrote:
> > >>>>>
> > >>>>>> Richard,
> > >>>>>>
> > >>>>>> Thank you for putting this put and pushing the discussion forward.
> > >>>>>>
> > >>>>>> I think this is a very large feature. It might be worth creating a
> > >>>> google
> > >>>>>> doc for it (which is better for collaboration). And I believe Ivan
> > >> has
> > >>>>>> some
> > >>>>>> thoughts as well. If you can put up a google doc (make it
> > >>>> world-editable),
> > >>>>>> Ivan can probably dump his thoughts there and we can finalize the
> > >>>>>> discussion and break down into tasks. So the whole community can
> > >>>> actually
> > >>>>>> work together at collaborating this.
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> Sijie
> > >>>>>>
> > >>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
> > >>> yohan.richard...@gmail.com>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi all,
> > >>>>>>>
> > >>>>>>> I would like to create a PIP for issue #2664 on Github. The
> > >> details
> > >>> of
> > >>>>>> the
> > >>>>>>> PIP are below.
> > >>>>>>> I hope we could discuss this thoroughly.
> > >>>>>>>
> > >>>>>>> Cheers,
> > >>>>>>> Richard
> > >>>>>>>
> > >>>>>>> PIP-31: Add support for transactional messaging
> > >>>>>>>
> > >>>>>>> Motivation: Pulsar currently could improve upon their system of
> > >>>> sending
> > >>>>>>> packets of data by implementing transactional messaging. This
> > >> system
> > >>>>>>> enforces eventual consistency within the system, and allows
> > >>> operations
> > >>>>>> to
> > >>>>>>> be performed atomically.
> > >>>>>>>
> > >>>>>>> Proposal:
> > >>>>>>>
> > >>>>>>> As described in the issue, we would implement the following policy
> > >>> in
> > >>>>>>> Producer and Pulsar Broker:
> > >>>>>>> 1. The producer produces the pre-processing transaction message.
> > >> At
> > >>>> this
> > >>>>>>> point, the broker will set the status of this message to unknown.
> > >>>>>>> 2. After the local transaction is successfully executed, the
> > >> commit
> > >>>>>> message
> > >>>>>>> is sent, otherwise the rollback message is sent.
> > >>>>>>> 3. The broker receives the message. If it is a commit message, it
> > >>>>>> modifies
> > >>>>>>> the transaction status to commit, and then sends an actual message
> > >>> to
> > >>>>>> the
> > >>>>>>> consumer queue. At this time, the consumer can consume the
> > >> message.
> > >>>>>>> Otherwise, the transaction status is modified to rollback. The
> > >>> message
> > >>>>>> will
> > >>>>>>> be discarded.
> > >>>>>>> 4. If at step 2, the producer is down or abnormal, at this time,
> > >> the
> > >>>>>> broker
> > >>>>>>> will periodically ask the specific producer for the status of the
> > >>>>>> message,
> > >>>>>>> and update the status according to the producer's response, and
> > >>>> process
> > >>>>>> it
> > >>>>>>> according to step 3, the action that comes down.
> > >>>>>>>
> > >>>>>>> Specific concerns:
> > >>>>>>> There are a number of things we will improve upon or add:
> > >>>>>>> - A configuration called ```maxMessageUnknownTime```. Consider
> > >> this
> > >>>>>>> scenario: the pre-processing transaction message is sent, but the
> > >>>>>> commit or
> > >>>>>>> rollback message is never received, which could mean that the
> > >> status
> > >>>> of
> > >>>>>> a
> > >>>>>>> message would be permanently unknown. To avoid this from
> > >> happening,
> > >>> we
> > >>>>>>> would need a config which limits the amount of time the status of
> > >> a
> > >>>>>> message
> > >>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After that,
> > >> the
> > >>>>>> message
> > >>>>>>> would be discarded.
> > >>>>>>> - Logging would be updated to log the status of a message i.e.
> > >>>> UNKNOWN,
> > >>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know whether
> > >> or
> > >>>>>> not a
> > >>>>>>> message had failed or fallen through.
> > >>>>>>>
> > >>>>>>> Possible Additional API:
> > >>>>>>> - We would add a method which allows the user to query the state
> > >> of
> > >>>> the
> > >>>>>>> message i.e. ```getStateOfMessage(long id)```
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >> --
> > >> -Ali
> > >>
> >
> >

Reply via email to