Hi -

> On Mar 2, 2019, at 6:39 PM, Sijie Guo <guosi...@gmail.com> wrote:
> 
> Dave,
> 
> You mean implementing the transactions in pulsar function?

Yes, that way there is no additional broker overhead and whatever happens when 
a commit happens is under the control of those making the transaction.

I’m not sure if it would work, but it seems that functions, spouts, and 
connectors make sense as opposed to burdening the highly performant brokers.

Regards,
Dave

> 
> - Sijie
> 
>> On Sun, Mar 3, 2019 at 1:52 AM Dave Fisher <dave2w...@comcast.net> wrote:
>> 
>> Hi -
>> 
>> Is this a case where a Pulsar function base class for transactions would
>> help?
>> 
>> Regards,
>> Dave
>> 
>> Sent from my iPhone
>> 
>>> On Mar 2, 2019, at 2:39 AM, Sijie Guo <guosi...@gmail.com> wrote:
>>> 
>>> Pravega's model is a better model than Kafka - it addressed the
>>> interleaving problems. However Pravega's model is based on a giant
>>> replicated log and rewrite the data to a second tiered storage for
>>> persistence, which basically re-implemented bookkeeper's logic in
>> broker. A
>>> fundamental drawback of Pravega is write amplifications. The
>> amplifications
>>> of both network and IO bandwidth are huge. If you use bookkeeper both for
>>> its first-and-second tier storage and assume the bookkeeper replication
>>> factor is 3, pravega requires 6x network bandwidth and 12x IO bandwidth.
>>> For a given message, it needs to write 3 times into the journal, and 3
>>> times for persistent. The amplifications hugely limit the throughput at
>>> pravega "brokers".
>>> 
>>> - Sijie
>>> 
>>> 
>>> 
>>>> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ahmal...@gmail.com> wrote:
>>>> 
>>>> I agree we many want to review pravega's past efforts in this area also.
>>>> 
>>>> 
>>>> 
>> https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
>>>> 
>>>> 
>> https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java
>>>> 
>>>> -Ali
>>>> 
>>>>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <guosi...@gmail.com> wrote:
>>>>> 
>>>>> Kafka's implementation is interleaving committed messages with
>>>> uncommitted
>>>>> messages at storage. Personally I think it is a very ugly design and
>>>>> implementation.
>>>>> 
>>>>> Pulsar is a segment centric system, where we have a shared segment
>>>> storage
>>>>> - bookkeeper. I think a better direction is to leverage the segments
>> (aka
>>>>> ledgers)
>>>>> for buffering uncommitted messages and commit the whole segment when
>> the
>>>>> whole transaction is committed.
>>>>> 
>>>>> A rough idea would be:
>>>>> 
>>>>> 1) for any transaction, write the messages to a separate ledger (or
>>>>> multiple separate ledger).
>>>>> 2) during the transaction, accumulates the messages in those ledgers.
>>>>> 3) when commit, merge the txn ledgers back to the main data ledger. the
>>>>> merge can be done either adding a meta message where data is stored in
>>>> the
>>>>> txn ledger or actually copying the data to data ledger (depending on
>> the
>>>>> size of data accumulate in the transaction).
>>>>> 4) when abort, delete the txn ledger. No other additional work to be
>>>> done.
>>>>> 
>>>>> This would be producing a much clear design than Kafka.
>>>>> 
>>>>> On Ivan's comments:
>>>>> 
>>>>>> Transactional acknowledgement also needs to be taken into account
>>>>> 
>>>>> I don't think we have to treat `transactional acknowledgement` as a
>>>> special
>>>>> case. currently `acknowledgment` are actually "append" operations into
>>>>> cursor ledgers.
>>>>> So the problem set can be reduced as `atomic append` to both data
>> ledgers
>>>>> and cursor ledgers. in that way, we can use one solution for handling
>>>>> appending data and updating cursors.
>>>>> 
>>>>> Additionally, I think a related topic about transactions would be
>>>>> supporting large sized message (e.g. >= 5MB). If we take the approach I
>>>>> described above using a separated ledger for accumulating messages for
>> a
>>>>> transaction, that we are easy to model a large size message as a
>>>>> transaction of chunked messages.
>>>>> 
>>>>> @Richard, @Ivan let me know what do you think. If you guys think the
>>>>> direction I raised is a good one to go down, I am happy to write them
>>>> down
>>>>> into details, and drive the design and coordinate the implementations
>> in
>>>>> the community.
>>>>> 
>>>>> - Sijie
>>>>> 
>>>>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <yohan.richard...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi all,
>>>>>> 
>>>>>> We might be able to get some ideas on implementing this from Kafka:
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
>>>>>> 
>>>>>> Obviously, there is some differences in Kafka and Pulsar internals but
>>>> at
>>>>>> some level, the implementation would be similar.
>>>>>> It should help.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <
>> yohan.richard...@gmail.com
>>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> Per request, I've created a doc so we could get some more input in an
>>>>>>> organized manner:
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
>>>>>>> 
>>>>>>> And for Ivan's questions, I would answer accordingly.
>>>>>>> 
>>>>>>>> By "set the message to unknown", do you mean the broker will cache
>>>> the
>>>>>>>> message, not writing it to any log?
>>>>>>> 
>>>>>>> We wouldn't cache the message from my interpretation of the steps.
>>>> What
>>>>>>> the producer is first sending is a pre-processing message, not the
>>>> real
>>>>>>> message itself. This step basically notifies the broker that the
>>>>> message
>>>>>> is
>>>>>>> on its way. So all we have to do is store the message id and its
>>>>>>> corresponding status in a map, and depending on the producer's
>>>>> response,
>>>>>>> the status will change accordingly.
>>>>>>> 
>>>>>>>> In designs we've discussed previously, this was handled
>>>>>>>> by a component called the transaction coordinator, which is a
>>>> logical
>>>>>>>> component which each broker knows how to talk to. For a transaction
>>>>>>>> the commit message is sent to the coordinator, which writes it to
>>>> its
>>>>>>>> own log, and then goes through each topic in the commit and marks
>>>> the
>>>>>>>> transaction as completed.
>>>>>>> 
>>>>>>> I wasn't aware of previous discussions on this topic, but it seems
>>>>> pretty
>>>>>>> good to me. It's certainly better than what I would come up with.
>>>>>>> If there's any more things we need to talk about, I suppose we could
>>>>> move
>>>>>>> it to the google doc to play around with.
>>>>>>> 
>>>>>>> Hope we can get this PIP rolling.
>>>>>>> 
>>>>>>> 
>>>>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <guosi...@gmail.com>
>>>> wrote:
>>>>>>> 
>>>>>>>> Richard,
>>>>>>>> 
>>>>>>>> Thank you for putting this put and pushing the discussion forward.
>>>>>>>> 
>>>>>>>> I think this is a very large feature. It might be worth creating a
>>>>>> google
>>>>>>>> doc for it (which is better for collaboration). And I believe Ivan
>>>> has
>>>>>>>> some
>>>>>>>> thoughts as well. If you can put up a google doc (make it
>>>>>> world-editable),
>>>>>>>> Ivan can probably dump his thoughts there and we can finalize the
>>>>>>>> discussion and break down into tasks. So the whole community can
>>>>>> actually
>>>>>>>> work together at collaborating this.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Sijie
>>>>>>>> 
>>>>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
>>>>> yohan.richard...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi all,
>>>>>>>>> 
>>>>>>>>> I would like to create a PIP for issue #2664 on Github. The
>>>> details
>>>>> of
>>>>>>>> the
>>>>>>>>> PIP are below.
>>>>>>>>> I hope we could discuss this thoroughly.
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Richard
>>>>>>>>> 
>>>>>>>>> PIP-31: Add support for transactional messaging
>>>>>>>>> 
>>>>>>>>> Motivation: Pulsar currently could improve upon their system of
>>>>>> sending
>>>>>>>>> packets of data by implementing transactional messaging. This
>>>> system
>>>>>>>>> enforces eventual consistency within the system, and allows
>>>>> operations
>>>>>>>> to
>>>>>>>>> be performed atomically.
>>>>>>>>> 
>>>>>>>>> Proposal:
>>>>>>>>> 
>>>>>>>>> As described in the issue, we would implement the following policy
>>>>> in
>>>>>>>>> Producer and Pulsar Broker:
>>>>>>>>> 1. The producer produces the pre-processing transaction message.
>>>> At
>>>>>> this
>>>>>>>>> point, the broker will set the status of this message to unknown.
>>>>>>>>> 2. After the local transaction is successfully executed, the
>>>> commit
>>>>>>>> message
>>>>>>>>> is sent, otherwise the rollback message is sent.
>>>>>>>>> 3. The broker receives the message. If it is a commit message, it
>>>>>>>> modifies
>>>>>>>>> the transaction status to commit, and then sends an actual message
>>>>> to
>>>>>>>> the
>>>>>>>>> consumer queue. At this time, the consumer can consume the
>>>> message.
>>>>>>>>> Otherwise, the transaction status is modified to rollback. The
>>>>> message
>>>>>>>> will
>>>>>>>>> be discarded.
>>>>>>>>> 4. If at step 2, the producer is down or abnormal, at this time,
>>>> the
>>>>>>>> broker
>>>>>>>>> will periodically ask the specific producer for the status of the
>>>>>>>> message,
>>>>>>>>> and update the status according to the producer's response, and
>>>>>> process
>>>>>>>> it
>>>>>>>>> according to step 3, the action that comes down.
>>>>>>>>> 
>>>>>>>>> Specific concerns:
>>>>>>>>> There are a number of things we will improve upon or add:
>>>>>>>>> - A configuration called ```maxMessageUnknownTime```. Consider
>>>> this
>>>>>>>>> scenario: the pre-processing transaction message is sent, but the
>>>>>>>> commit or
>>>>>>>>> rollback message is never received, which could mean that the
>>>> status
>>>>>> of
>>>>>>>> a
>>>>>>>>> message would be permanently unknown. To avoid this from
>>>> happening,
>>>>> we
>>>>>>>>> would need a config which limits the amount of time the status of
>>>> a
>>>>>>>> message
>>>>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After that,
>>>> the
>>>>>>>> message
>>>>>>>>> would be discarded.
>>>>>>>>> - Logging would be updated to log the status of a message i.e.
>>>>>> UNKNOWN,
>>>>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know whether
>>>> or
>>>>>>>> not a
>>>>>>>>> message had failed or fallen through.
>>>>>>>>> 
>>>>>>>>> Possible Additional API:
>>>>>>>>> - We would add a method which allows the user to query the state
>>>> of
>>>>>> the
>>>>>>>>> message i.e. ```getStateOfMessage(long id)```
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> --
>>>> -Ali
>>>> 
>> 
>> 

Reply via email to