Hi -

Is this a case where a Pulsar function base class for transactions would help?

Regards,
Dave

Sent from my iPhone

> On Mar 2, 2019, at 2:39 AM, Sijie Guo <guosi...@gmail.com> wrote:
> 
> Pravega's model is a better model than Kafka - it addressed the
> interleaving problems. However Pravega's model is based on a giant
> replicated log and rewrite the data to a second tiered storage for
> persistence, which basically re-implemented bookkeeper's logic in broker. A
> fundamental drawback of Pravega is write amplifications. The amplifications
> of both network and IO bandwidth are huge. If you use bookkeeper both for
> its first-and-second tier storage and assume the bookkeeper replication
> factor is 3, pravega requires 6x network bandwidth and 12x IO bandwidth.
> For a given message, it needs to write 3 times into the journal, and 3
> times for persistent. The amplifications hugely limit the throughput at
> pravega "brokers".
> 
> - Sijie
> 
> 
> 
>> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ahmal...@gmail.com> wrote:
>> 
>> I agree we many want to review pravega's past efforts in this area also.
>> 
>> 
>> https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
>> 
>> https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java
>> 
>> -Ali
>> 
>>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <guosi...@gmail.com> wrote:
>>> 
>>> Kafka's implementation is interleaving committed messages with
>> uncommitted
>>> messages at storage. Personally I think it is a very ugly design and
>>> implementation.
>>> 
>>> Pulsar is a segment centric system, where we have a shared segment
>> storage
>>> - bookkeeper. I think a better direction is to leverage the segments (aka
>>> ledgers)
>>> for buffering uncommitted messages and commit the whole segment when the
>>> whole transaction is committed.
>>> 
>>> A rough idea would be:
>>> 
>>> 1) for any transaction, write the messages to a separate ledger (or
>>> multiple separate ledger).
>>> 2) during the transaction, accumulates the messages in those ledgers.
>>> 3) when commit, merge the txn ledgers back to the main data ledger. the
>>> merge can be done either adding a meta message where data is stored in
>> the
>>> txn ledger or actually copying the data to data ledger (depending on the
>>> size of data accumulate in the transaction).
>>> 4) when abort, delete the txn ledger. No other additional work to be
>> done.
>>> 
>>> This would be producing a much clear design than Kafka.
>>> 
>>> On Ivan's comments:
>>> 
>>>> Transactional acknowledgement also needs to be taken into account
>>> 
>>> I don't think we have to treat `transactional acknowledgement` as a
>> special
>>> case. currently `acknowledgment` are actually "append" operations into
>>> cursor ledgers.
>>> So the problem set can be reduced as `atomic append` to both data ledgers
>>> and cursor ledgers. in that way, we can use one solution for handling
>>> appending data and updating cursors.
>>> 
>>> Additionally, I think a related topic about transactions would be
>>> supporting large sized message (e.g. >= 5MB). If we take the approach I
>>> described above using a separated ledger for accumulating messages for a
>>> transaction, that we are easy to model a large size message as a
>>> transaction of chunked messages.
>>> 
>>> @Richard, @Ivan let me know what do you think. If you guys think the
>>> direction I raised is a good one to go down, I am happy to write them
>> down
>>> into details, and drive the design and coordinate the implementations in
>>> the community.
>>> 
>>> - Sijie
>>> 
>>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <yohan.richard...@gmail.com>
>>> wrote:
>>> 
>>>> Hi all,
>>>> 
>>>> We might be able to get some ideas on implementing this from Kafka:
>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
>>>> 
>>>> Obviously, there is some differences in Kafka and Pulsar internals but
>> at
>>>> some level, the implementation would be similar.
>>>> It should help.
>>>> 
>>>> 
>>>> 
>>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <yohan.richard...@gmail.com
>>> 
>>>> wrote:
>>>> 
>>>>> Hi,
>>>>> 
>>>>> Per request, I've created a doc so we could get some more input in an
>>>>> organized manner:
>>>>> 
>>>>> 
>>>> 
>>> 
>> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
>>>>> 
>>>>> And for Ivan's questions, I would answer accordingly.
>>>>> 
>>>>>> By "set the message to unknown", do you mean the broker will cache
>> the
>>>>>> message, not writing it to any log?
>>>>> 
>>>>> We wouldn't cache the message from my interpretation of the steps.
>> What
>>>>> the producer is first sending is a pre-processing message, not the
>> real
>>>>> message itself. This step basically notifies the broker that the
>>> message
>>>> is
>>>>> on its way. So all we have to do is store the message id and its
>>>>> corresponding status in a map, and depending on the producer's
>>> response,
>>>>> the status will change accordingly.
>>>>> 
>>>>>> In designs we've discussed previously, this was handled
>>>>>> by a component called the transaction coordinator, which is a
>> logical
>>>>>> component which each broker knows how to talk to. For a transaction
>>>>>> the commit message is sent to the coordinator, which writes it to
>> its
>>>>>> own log, and then goes through each topic in the commit and marks
>> the
>>>>>> transaction as completed.
>>>>> 
>>>>> I wasn't aware of previous discussions on this topic, but it seems
>>> pretty
>>>>> good to me. It's certainly better than what I would come up with.
>>>>> If there's any more things we need to talk about, I suppose we could
>>> move
>>>>> it to the google doc to play around with.
>>>>> 
>>>>> Hope we can get this PIP rolling.
>>>>> 
>>>>> 
>>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <guosi...@gmail.com>
>> wrote:
>>>>> 
>>>>>> Richard,
>>>>>> 
>>>>>> Thank you for putting this put and pushing the discussion forward.
>>>>>> 
>>>>>> I think this is a very large feature. It might be worth creating a
>>>> google
>>>>>> doc for it (which is better for collaboration). And I believe Ivan
>> has
>>>>>> some
>>>>>> thoughts as well. If you can put up a google doc (make it
>>>> world-editable),
>>>>>> Ivan can probably dump his thoughts there and we can finalize the
>>>>>> discussion and break down into tasks. So the whole community can
>>>> actually
>>>>>> work together at collaborating this.
>>>>>> 
>>>>>> Thanks,
>>>>>> Sijie
>>>>>> 
>>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
>>> yohan.richard...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi all,
>>>>>>> 
>>>>>>> I would like to create a PIP for issue #2664 on Github. The
>> details
>>> of
>>>>>> the
>>>>>>> PIP are below.
>>>>>>> I hope we could discuss this thoroughly.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Richard
>>>>>>> 
>>>>>>> PIP-31: Add support for transactional messaging
>>>>>>> 
>>>>>>> Motivation: Pulsar currently could improve upon their system of
>>>> sending
>>>>>>> packets of data by implementing transactional messaging. This
>> system
>>>>>>> enforces eventual consistency within the system, and allows
>>> operations
>>>>>> to
>>>>>>> be performed atomically.
>>>>>>> 
>>>>>>> Proposal:
>>>>>>> 
>>>>>>> As described in the issue, we would implement the following policy
>>> in
>>>>>>> Producer and Pulsar Broker:
>>>>>>> 1. The producer produces the pre-processing transaction message.
>> At
>>>> this
>>>>>>> point, the broker will set the status of this message to unknown.
>>>>>>> 2. After the local transaction is successfully executed, the
>> commit
>>>>>> message
>>>>>>> is sent, otherwise the rollback message is sent.
>>>>>>> 3. The broker receives the message. If it is a commit message, it
>>>>>> modifies
>>>>>>> the transaction status to commit, and then sends an actual message
>>> to
>>>>>> the
>>>>>>> consumer queue. At this time, the consumer can consume the
>> message.
>>>>>>> Otherwise, the transaction status is modified to rollback. The
>>> message
>>>>>> will
>>>>>>> be discarded.
>>>>>>> 4. If at step 2, the producer is down or abnormal, at this time,
>> the
>>>>>> broker
>>>>>>> will periodically ask the specific producer for the status of the
>>>>>> message,
>>>>>>> and update the status according to the producer's response, and
>>>> process
>>>>>> it
>>>>>>> according to step 3, the action that comes down.
>>>>>>> 
>>>>>>> Specific concerns:
>>>>>>> There are a number of things we will improve upon or add:
>>>>>>> - A configuration called ```maxMessageUnknownTime```. Consider
>> this
>>>>>>> scenario: the pre-processing transaction message is sent, but the
>>>>>> commit or
>>>>>>> rollback message is never received, which could mean that the
>> status
>>>> of
>>>>>> a
>>>>>>> message would be permanently unknown. To avoid this from
>> happening,
>>> we
>>>>>>> would need a config which limits the amount of time the status of
>> a
>>>>>> message
>>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After that,
>> the
>>>>>> message
>>>>>>> would be discarded.
>>>>>>> - Logging would be updated to log the status of a message i.e.
>>>> UNKNOWN,
>>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know whether
>> or
>>>>>> not a
>>>>>>> message had failed or fallen through.
>>>>>>> 
>>>>>>> Possible Additional API:
>>>>>>> - We would add a method which allows the user to query the state
>> of
>>>> the
>>>>>>> message i.e. ```getStateOfMessage(long id)```
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 
>> --
>> -Ali
>> 

Reply via email to