Matteo, Dave,

I think you are talking about different things. My comments to both:

> Once there's support for transactions in messaging API, there will be
> no need for a base class for functions. Rather a config option will
> allow to enable transactional mode.

Matteo, If I understand your comment correctly, you are talking about
functions using transactions for processing semantics. If so, yes that
would be the end goal.

> Yes, that way there is no additional broker overhead and whatever happens
when a commit happens is under the control of those making the transaction.

Dave, this sounds an interesting idea and it is definitely do-able. Because
Pulsar is a multi-layered system and it is built on top of a reliable
storage, so a lot of components are just "stateless", "logical" and not
bound to any physical machines. so when we implement a component /
functionality, we basically implement a logical unit. How to run the logic
unit can be very flexible. It can run as a separated service, or as part of
broker, or in functions.

- Sijie


On Sun, Mar 3, 2019 at 10:52 AM Dave Fisher <dave2w...@comcast.net> wrote:

> Hi -
>
> > On Mar 2, 2019, at 6:39 PM, Sijie Guo <guosi...@gmail.com> wrote:
> >
> > Dave,
> >
> > You mean implementing the transactions in pulsar function?
>
> Yes, that way there is no additional broker overhead and whatever happens
> when a commit happens is under the control of those making the transaction.
>
> I’m not sure if it would work, but it seems that functions, spouts, and
> connectors make sense as opposed to burdening the highly performant brokers.
>
> Regards,
> Dave
>
> >
> > - Sijie
> >
> >> On Sun, Mar 3, 2019 at 1:52 AM Dave Fisher <dave2w...@comcast.net>
> wrote:
> >>
> >> Hi -
> >>
> >> Is this a case where a Pulsar function base class for transactions would
> >> help?
> >>
> >> Regards,
> >> Dave
> >>
> >> Sent from my iPhone
> >>
> >>> On Mar 2, 2019, at 2:39 AM, Sijie Guo <guosi...@gmail.com> wrote:
> >>>
> >>> Pravega's model is a better model than Kafka - it addressed the
> >>> interleaving problems. However Pravega's model is based on a giant
> >>> replicated log and rewrite the data to a second tiered storage for
> >>> persistence, which basically re-implemented bookkeeper's logic in
> >> broker. A
> >>> fundamental drawback of Pravega is write amplifications. The
> >> amplifications
> >>> of both network and IO bandwidth are huge. If you use bookkeeper both
> for
> >>> its first-and-second tier storage and assume the bookkeeper replication
> >>> factor is 3, pravega requires 6x network bandwidth and 12x IO
> bandwidth.
> >>> For a given message, it needs to write 3 times into the journal, and 3
> >>> times for persistent. The amplifications hugely limit the throughput at
> >>> pravega "brokers".
> >>>
> >>> - Sijie
> >>>
> >>>
> >>>
> >>>> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ahmal...@gmail.com> wrote:
> >>>>
> >>>> I agree we many want to review pravega's past efforts in this area
> also.
> >>>>
> >>>>
> >>>>
> >>
> https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
> >>>>
> >>>>
> >>
> https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java
> >>>>
> >>>> -Ali
> >>>>
> >>>>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <guosi...@gmail.com> wrote:
> >>>>>
> >>>>> Kafka's implementation is interleaving committed messages with
> >>>> uncommitted
> >>>>> messages at storage. Personally I think it is a very ugly design and
> >>>>> implementation.
> >>>>>
> >>>>> Pulsar is a segment centric system, where we have a shared segment
> >>>> storage
> >>>>> - bookkeeper. I think a better direction is to leverage the segments
> >> (aka
> >>>>> ledgers)
> >>>>> for buffering uncommitted messages and commit the whole segment when
> >> the
> >>>>> whole transaction is committed.
> >>>>>
> >>>>> A rough idea would be:
> >>>>>
> >>>>> 1) for any transaction, write the messages to a separate ledger (or
> >>>>> multiple separate ledger).
> >>>>> 2) during the transaction, accumulates the messages in those ledgers.
> >>>>> 3) when commit, merge the txn ledgers back to the main data ledger.
> the
> >>>>> merge can be done either adding a meta message where data is stored
> in
> >>>> the
> >>>>> txn ledger or actually copying the data to data ledger (depending on
> >> the
> >>>>> size of data accumulate in the transaction).
> >>>>> 4) when abort, delete the txn ledger. No other additional work to be
> >>>> done.
> >>>>>
> >>>>> This would be producing a much clear design than Kafka.
> >>>>>
> >>>>> On Ivan's comments:
> >>>>>
> >>>>>> Transactional acknowledgement also needs to be taken into account
> >>>>>
> >>>>> I don't think we have to treat `transactional acknowledgement` as a
> >>>> special
> >>>>> case. currently `acknowledgment` are actually "append" operations
> into
> >>>>> cursor ledgers.
> >>>>> So the problem set can be reduced as `atomic append` to both data
> >> ledgers
> >>>>> and cursor ledgers. in that way, we can use one solution for handling
> >>>>> appending data and updating cursors.
> >>>>>
> >>>>> Additionally, I think a related topic about transactions would be
> >>>>> supporting large sized message (e.g. >= 5MB). If we take the
> approach I
> >>>>> described above using a separated ledger for accumulating messages
> for
> >> a
> >>>>> transaction, that we are easy to model a large size message as a
> >>>>> transaction of chunked messages.
> >>>>>
> >>>>> @Richard, @Ivan let me know what do you think. If you guys think the
> >>>>> direction I raised is a good one to go down, I am happy to write them
> >>>> down
> >>>>> into details, and drive the design and coordinate the implementations
> >> in
> >>>>> the community.
> >>>>>
> >>>>> - Sijie
> >>>>>
> >>>>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <
> yohan.richard...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> We might be able to get some ideas on implementing this from Kafka:
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> >>>>>>
> >>>>>> Obviously, there is some differences in Kafka and Pulsar internals
> but
> >>>> at
> >>>>>> some level, the implementation would be similar.
> >>>>>> It should help.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <
> >> yohan.richard...@gmail.com
> >>>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> Per request, I've created a doc so we could get some more input in
> an
> >>>>>>> organized manner:
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
> >>>>>>>
> >>>>>>> And for Ivan's questions, I would answer accordingly.
> >>>>>>>
> >>>>>>>> By "set the message to unknown", do you mean the broker will cache
> >>>> the
> >>>>>>>> message, not writing it to any log?
> >>>>>>>
> >>>>>>> We wouldn't cache the message from my interpretation of the steps.
> >>>> What
> >>>>>>> the producer is first sending is a pre-processing message, not the
> >>>> real
> >>>>>>> message itself. This step basically notifies the broker that the
> >>>>> message
> >>>>>> is
> >>>>>>> on its way. So all we have to do is store the message id and its
> >>>>>>> corresponding status in a map, and depending on the producer's
> >>>>> response,
> >>>>>>> the status will change accordingly.
> >>>>>>>
> >>>>>>>> In designs we've discussed previously, this was handled
> >>>>>>>> by a component called the transaction coordinator, which is a
> >>>> logical
> >>>>>>>> component which each broker knows how to talk to. For a
> transaction
> >>>>>>>> the commit message is sent to the coordinator, which writes it to
> >>>> its
> >>>>>>>> own log, and then goes through each topic in the commit and marks
> >>>> the
> >>>>>>>> transaction as completed.
> >>>>>>>
> >>>>>>> I wasn't aware of previous discussions on this topic, but it seems
> >>>>> pretty
> >>>>>>> good to me. It's certainly better than what I would come up with.
> >>>>>>> If there's any more things we need to talk about, I suppose we
> could
> >>>>> move
> >>>>>>> it to the google doc to play around with.
> >>>>>>>
> >>>>>>> Hope we can get this PIP rolling.
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <guosi...@gmail.com>
> >>>> wrote:
> >>>>>>>
> >>>>>>>> Richard,
> >>>>>>>>
> >>>>>>>> Thank you for putting this put and pushing the discussion forward.
> >>>>>>>>
> >>>>>>>> I think this is a very large feature. It might be worth creating a
> >>>>>> google
> >>>>>>>> doc for it (which is better for collaboration). And I believe Ivan
> >>>> has
> >>>>>>>> some
> >>>>>>>> thoughts as well. If you can put up a google doc (make it
> >>>>>> world-editable),
> >>>>>>>> Ivan can probably dump his thoughts there and we can finalize the
> >>>>>>>> discussion and break down into tasks. So the whole community can
> >>>>>> actually
> >>>>>>>> work together at collaborating this.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Sijie
> >>>>>>>>
> >>>>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
> >>>>> yohan.richard...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> I would like to create a PIP for issue #2664 on Github. The
> >>>> details
> >>>>> of
> >>>>>>>> the
> >>>>>>>>> PIP are below.
> >>>>>>>>> I hope we could discuss this thoroughly.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Richard
> >>>>>>>>>
> >>>>>>>>> PIP-31: Add support for transactional messaging
> >>>>>>>>>
> >>>>>>>>> Motivation: Pulsar currently could improve upon their system of
> >>>>>> sending
> >>>>>>>>> packets of data by implementing transactional messaging. This
> >>>> system
> >>>>>>>>> enforces eventual consistency within the system, and allows
> >>>>> operations
> >>>>>>>> to
> >>>>>>>>> be performed atomically.
> >>>>>>>>>
> >>>>>>>>> Proposal:
> >>>>>>>>>
> >>>>>>>>> As described in the issue, we would implement the following
> policy
> >>>>> in
> >>>>>>>>> Producer and Pulsar Broker:
> >>>>>>>>> 1. The producer produces the pre-processing transaction message.
> >>>> At
> >>>>>> this
> >>>>>>>>> point, the broker will set the status of this message to unknown.
> >>>>>>>>> 2. After the local transaction is successfully executed, the
> >>>> commit
> >>>>>>>> message
> >>>>>>>>> is sent, otherwise the rollback message is sent.
> >>>>>>>>> 3. The broker receives the message. If it is a commit message, it
> >>>>>>>> modifies
> >>>>>>>>> the transaction status to commit, and then sends an actual
> message
> >>>>> to
> >>>>>>>> the
> >>>>>>>>> consumer queue. At this time, the consumer can consume the
> >>>> message.
> >>>>>>>>> Otherwise, the transaction status is modified to rollback. The
> >>>>> message
> >>>>>>>> will
> >>>>>>>>> be discarded.
> >>>>>>>>> 4. If at step 2, the producer is down or abnormal, at this time,
> >>>> the
> >>>>>>>> broker
> >>>>>>>>> will periodically ask the specific producer for the status of the
> >>>>>>>> message,
> >>>>>>>>> and update the status according to the producer's response, and
> >>>>>> process
> >>>>>>>> it
> >>>>>>>>> according to step 3, the action that comes down.
> >>>>>>>>>
> >>>>>>>>> Specific concerns:
> >>>>>>>>> There are a number of things we will improve upon or add:
> >>>>>>>>> - A configuration called ```maxMessageUnknownTime```. Consider
> >>>> this
> >>>>>>>>> scenario: the pre-processing transaction message is sent, but the
> >>>>>>>> commit or
> >>>>>>>>> rollback message is never received, which could mean that the
> >>>> status
> >>>>>> of
> >>>>>>>> a
> >>>>>>>>> message would be permanently unknown. To avoid this from
> >>>> happening,
> >>>>> we
> >>>>>>>>> would need a config which limits the amount of time the status of
> >>>> a
> >>>>>>>> message
> >>>>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After that,
> >>>> the
> >>>>>>>> message
> >>>>>>>>> would be discarded.
> >>>>>>>>> - Logging would be updated to log the status of a message i.e.
> >>>>>> UNKNOWN,
> >>>>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know whether
> >>>> or
> >>>>>>>> not a
> >>>>>>>>> message had failed or fallen through.
> >>>>>>>>>
> >>>>>>>>> Possible Additional API:
> >>>>>>>>> - We would add a method which allows the user to query the state
> >>>> of
> >>>>>> the
> >>>>>>>>> message i.e. ```getStateOfMessage(long id)```
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> -Ali
> >>>>
> >>
> >>
>
>

Reply via email to