Hi all, A File-Based DTX store was implemented using LevelDB to store
distributed transaction related data. As XA transaction (two-phase commit)
is used to handle the distributed transaction in MB, the related xid along
with node id and branch id should be stored. Each transaction consists of
message enqueue/dequeue (or both) operations. In the prepare phase of the
transaction, metadata and the content of the messages which should be
enqueued/dequeued have to be stored temporarily. When the transaction is in
the commit phase, enqueue/dequeue operations of the transaction should be
committed and temporary records during the prepare phase should be deleted.
LevelDB WriteBatch[1] was used to implement these operations.

Following is the key schema for the DTX Store. These keys are defined to
hold the related data temporary in the prepare phase of the transaction.

Key Schema :

Key

Value

DTX_ENTRY : $xid: $node_id : BRANCH_ID

Branch ID

DTX_ENTRY : $xid : $node_id : FORMAT_CODE

Format code

DTX_ENTRY : $xid : $node_id : GLOBAL_ID

Global ID

DTX_ENQUEUE : $xid : MESSAGE_METADATA : $message_id

Metadata of the message which should be enqueued

DTX_ENQUEUE : $xid : MESSAGE_CONTENT : $message_id : $offset

Content chunk of the message which should be enqueued

DTX_DEQUEUE : $xid : MESSAGE_METADATA : $message_id

Metadata of the message which should be dequeued

DTX_DEQUEUE : $xid :  MESSAGE_CONTENT : $message_id : $offset

Content chunk of the message which should be dequeued

DTX_DEQUEUE : $xid : DESTINATION_NAME : $message_id

Destination of the message which should be dequeued

Implementation :

The implementation of the file based DTX Store is done by implementing
methods in DtxStore interface.[2]


[1]https://github.com/google/leveldb/blob/master/doc/index.md#atomic-updates
[2] Pull Request :  https://github.com/wso2/andes/pull/944/files
<https://github.com/wso2/andes/pull/944/files>
Best Regards,

On Fri, Nov 10, 2017 at 11:53 AM, Wishmitha Mendis <[email protected]>
wrote:

> Hi all,
>
> The key schema for LevelDB store was updated due to following reasons.
>
>    1.
>
>    Reduce data replication
>    2.
>
>    Minimize the lookup time for iterators when retrieving data
>
>
> When updating the schema lexicographically ordered storage mechanism of
> LevelDB was taken into consideration.
>
> Updated Key Schema :
>
> Key
>
> Value
>
> MESSAGE.$message_id.$content_offset.MESSAGE_CONTENT
>
> Message content chunk
>
> MESSAGE.$message_id.DESTINATION_NAME
>
> Name of the queue/topic where the message is stored
>
> MESSAGE.$message_id.DLC_STATUS
>
> “1” if the message is in dead letter channel, else “0”
>
> DESTINATION.MESSAGE_COUNT.$destination_name
>
> Message count of the queue/topic
>
> DESTINATION.$destination_name.MESSAGE_METADATA.$message_id
>
> Metadata of the message which is stored in the queue/topic.
>
> DESTINATION.$destination_name.MESSAGE_EXPIRATION_TIME.$message_id
>
> Expiration time of the message which is stored in the queue/topic.
>
> Comparison :
>
> When compared to the previous schema, this schema has less data
> repetition. Previously, the message metadata and expiration time were
> stored both under the message and the destination, whereas in this schema
> metadata and expiration time are stored only under the destination.
>
> Attributes like QUEUE_ID, DLC_QUEUE_ID were removed from the schema, as
> all the queues/topics are guaranteed to have a unique name.
>
> In the previous key schema, metadata and expiration time of a message in a
> destination were stored successively.
>
> [image: Screenshot from 2017-11-10 10-58-12.png]
>
> With this schema, when an iterator looks up for all message metadata in a
> destination, it will have to traverse through all the keys including the
> keys related to expiration time as well. This lookup time can be reduced by
> separating expiration time and message metadata as follows.
>
> [image: Screenshot from 2017-11-10 11-06-40.png]
>
> Now the iterator does not have to traverse through expiration time related
> keys as they are separated from the message metadata related keys. This is
> used in the updated schema. Hence the lookup time for both metadata and
> expiration time will be halved compared to the previous schema.
>
> As the total number of keys per message is reduced in this schema,
> publisher rates will be increased too.
>
> Dead Letter Channel :
>
> Dead letter channel is considered as a destination(queue) where messages
> will be moved to it, once the maximum redelivery attempts are reached. The 
> DLC_STATUS
> of the message will be updated from "0" to “1”, once the message is moved
> to the dead letter channel.
>
> On Wed, Aug 16, 2017 at 5:24 PM, Hasitha Hiranya <[email protected]>
> wrote:
>
>> Hi Asanka and all,
>>
>> On Wed, Aug 16, 2017 at 12:08 PM, Asanka Abeyweera <[email protected]>
>> wrote:
>>
>>> Hi Wishmitha,
>>>
>>> On Tue, Aug 15, 2017 at 5:34 PM, Wishmitha Mendis <[email protected]>
>>> wrote:
>>>
>>>> Hi Asanka,
>>>>
>>>> 1. We can initially use a Network File System as you mentioned for HA.
>>>> However data replication in LevelDB is used in ActiveMQ replicated store.
>>>> [1]
>>>>
>>>
>>> Then It is better to run the performance test with a NFS to really
>>> compare the results.
>>>
>>
>> +1 for this. We have seen ActiveMQ also deliver low performance when we
>> do this.
>> For distributed brokering scenarios, how ActiveMQ does it is, using
>> broker-to-broker communication [1]. Each broker node has its own file based
>> storage system, and message is routed looking at where the real consumer
>> is.
>>
>> [1]. https://dzone.com/articles/active-mq-network-brokers
>>
>>>
>>>
>>>> 2. Yes, the iterator should traverse all the related keys to get
>>>> message data. This is why the key schema is designed in a such way that all
>>>> the messages in a queue are stored successively to reduce the traversing
>>>> time. And as you said it is much more complex than the RDBMS as the data
>>>> cannot be retrieved by simply executing a query. However, LevelDB can still
>>>> provide much faster data retrieval rates than RDBMS due to its high
>>>> performances. Hence, even though the data retrieving operation is complex,
>>>> it is not a much of an overhead when it comes to overall performances.
>>>>
>>>
>>> What happens if we have lot of messages for a queue/queues which is
>>> stored before the interested queue. Shouldn't we traverse until we find the
>>> interested queue. If my understanding is correct that will result in
>>> consuming a considerable amount of resources (CPU, bancdwith, etc). RDBMS
>>> get around this by maintaining indexes. Do we have something similar in
>>> leveldb?
>>>
>>>
>>>>
>>>> (Additional : Most of the RDBMS engines use file based stores
>>>> underneath. As an example, LevelDB is used as a database engine in MariaDB.
>>>> [2] Hence, even when executing a query in RDBMS, these kind of traversing
>>>> operations may occur underneath.)
>>>>
>>>> 3. Data cannot be inserted while traversing. Actually traversing
>>>> through the keys is done by an iterator which should be eventually closed
>>>> after the operation is completed. A sample code is represented below. [3]
>>>>
>>>> DBIterator iterator = db.iterator();
>>>> try {
>>>>   for(iterator.seekToFirst(); iterator.hasNext(); iterator.next()) {
>>>>     String key = asString(iterator.peekNext().getKey());
>>>>     String value = asString(iterator.peekNext().getValue());
>>>>     System.out.println(key+" = "+value);
>>>>   }
>>>> } finally {
>>>>   // Make sure you close the iterator to avoid resource leaks.
>>>>   iterator.close();
>>>> }
>>>>
>>>> These iterators are mainly used in methods such as getMetaDataList()
>>>> and deleteMessages() in the implementation. The iterator should be closed
>>>> in those methods, as displayed in the above code.
>>>>
>>>
>>> Isn't that a bottleneck when there are concurrent consumers and
>>> publishers? Data writing threads will have to wait until the data reading
>>> thread is done which can result in lower performance numbers when there are
>>> multiple queues with multiple consumers and publishers.
>>>
>>>
>>>>
>>>> 4. Yes this will be a performance limitation. The throughput get
>>>> reasonably low when publishing/retrieving messages in multi-threaded
>>>> environment. Even though LevelDB is capable of providing higher throughput
>>>> than RDBMS even in a multi-threaded environment according to the test
>>>> results, that can be a bottleneck in concurrent access of DB. Main purpose
>>>> of this PoC is actually develop a generic key schema, so that we can switch
>>>> between and select the optimal file based store for the message broker.
>>>>
>>>> 5. LevelDB does not have an inbuilt transaction support. Therefore
>>>> transactions should implemented as a external layer within the application.
>>>> Currently I am working on this and exploring how the transactions are
>>>> implemented in ActiveMQ LevelDB store. [4] I will post a separate thread on
>>>> LevelDB transactions.
>>>>
>>>
>>> Maybe before implementing a transaction layer, we should do a proper
>>> performance test and decide on the path forward.
>>>
>>> WDYT?
>>>
>>>
>>>>
>>>>
>>>> [1] http://activemq.apache.org/replicated-leveldb-store.html
>>>> [2] https://mariadb.com/kb/en/mariadb/leveldb/
>>>> [3] https://github.com/fusesource/leveldbjni
>>>> [4] https://github.com/apache/activemq/tree/master/activemq-
>>>> leveldb-store
>>>>
>>>> On Tue, Aug 15, 2017 at 11:21 AM, Asanka Abeyweera <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Wishmitha,
>>>>>
>>>>>    1. How are we going to support HA deployment with LevelDB? Are we
>>>>>    going to use a network file system or replicate data?
>>>>>    2. If we wanted to get a set of message matching a given queue ID,
>>>>>    do we have to traverse all messages to get that? In RDBMS this is 
>>>>> easier to
>>>>>    do with a where clause.
>>>>>    3. What happens if we insert data while traversing?
>>>>>    4. It seems "*only a single process (possibly multi-threaded) can
>>>>>    access a particular database at a time*"[1] in LevelDB. Will this
>>>>>    be a bottleneck when we need to access the DB concurrently?
>>>>>    5. How are the transactions handled in LevelDB? When
>>>>>    implementing distributed transactions feature we required row
>>>>>    level locking instead of table level locking. Does LevelDB support 
>>>>> that?
>>>>>
>>>>> [1] https://github.com/google/leveldb
>>>>>
>>>>> On Tue, Aug 15, 2017 at 10:47 AM, Wishmitha Mendis <[email protected]
>>>>> > wrote:
>>>>>
>>>>>> Hi Sumedha,
>>>>>>
>>>>>> The Java library for LevelDB (leveldbjni) creates the database as
>>>>>> follows as mentioned in the docs. [1]
>>>>>>
>>>>>> Options options = new Options();
>>>>>> DB db = factory.open(new File("example"), options);
>>>>>>
>>>>>> This will create the database in a directory on a given path. And in
>>>>>> the library docs, it is mentioned that the library supports several
>>>>>> platforms if not specifically configured. Therefore using this library 
>>>>>> does
>>>>>> not require to ship LevelDB and it also won't take away platform
>>>>>> agnostic installation capability of MB. However the implementation is
>>>>>> currently only tested on Linux, I will test it on Windows and other
>>>>>> platforms and let you know.
>>>>>>
>>>>>> When considering the LevelDB architecture, it is already used as a
>>>>>> broker store in ActiveMQ. [2] [3] This proves that LevelDB has the
>>>>>> architectural capability to efficiently insert and delete messages in a
>>>>>> broker.
>>>>>>
>>>>>> [1] https://github.com/fusesource/leveldbjni
>>>>>> [2] http://activemq.apache.org/leveldb-store.html
>>>>>> [3] https://github.com/apache/activemq/tree/master/activemq-
>>>>>> leveldb-store
>>>>>>
>>>>>> Best Regards,
>>>>>>
>>>>>> On Tue, Aug 15, 2017 at 2:29 AM, Sumedha Rubasinghe <[email protected]
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi Wishmitha,
>>>>>>> Would leveldb architecture be efficient for a message broker where
>>>>>>> removing delivered messages is very frequent?
>>>>>>>
>>>>>>> This requires WSO2 Message Broker to ship leveldb. leveldb (
>>>>>>> https://github.com/google/leveldb) has native distributions for
>>>>>>> platforms. AFAIC this will take away platform agnostic installation
>>>>>>> capability of MB.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 15, 2017 at 2:20 AM, Wishmitha Mendis <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I am working on a project to replace the current RDBMS based
>>>>>>>> database of the message broker store with a file based database system.
>>>>>>>> Currently the implementation is carried out in LevelDB which is a 
>>>>>>>> key-value
>>>>>>>> based data store. The following is an explanation of suggested key 
>>>>>>>> schema
>>>>>>>> for the data store with related design decisions.
>>>>>>>>
>>>>>>>> *Overview :*
>>>>>>>>
>>>>>>>> LevelDB is a key value based database where a value can be stored
>>>>>>>> under a certain unique key. This key-value mapping is one directional 
>>>>>>>> which
>>>>>>>> means a value only can be retrieved by accessing corresponding key. 
>>>>>>>> One of
>>>>>>>> the main features in LevelDB is that it stores keys in a 
>>>>>>>> lexicographically
>>>>>>>> (alphabetically) sorted order. All the keys and values are stored in 
>>>>>>>> byte
>>>>>>>> array format in the database store which should be accordingly 
>>>>>>>> converted to
>>>>>>>> string format within the application.
>>>>>>>>
>>>>>>>> For this LevelDB store implementation leveldbjni-1.8[1] is used
>>>>>>>> which provides a Java based API for LevelDB by providing following main
>>>>>>>> functionalities.
>>>>>>>>
>>>>>>>>
>>>>>>>>    1.
>>>>>>>>
>>>>>>>>    put(key,value) : stores given value under the provided key
>>>>>>>>    2.
>>>>>>>>
>>>>>>>>    get(key) : returns corresponding value to the key
>>>>>>>>    3.
>>>>>>>>
>>>>>>>>    delete(key) : deletes given key
>>>>>>>>    4.
>>>>>>>>
>>>>>>>>    batch() : provides atomicity for the operations
>>>>>>>>    5.
>>>>>>>>
>>>>>>>>    iterator() : traverse through the stored keys
>>>>>>>>
>>>>>>>>
>>>>>>>> When designing the key schema in Level DB the following factors are
>>>>>>>> mainly considered.
>>>>>>>>
>>>>>>>>
>>>>>>>>    1.
>>>>>>>>
>>>>>>>>    Lexicographical order of the stored keys
>>>>>>>>    2.
>>>>>>>>
>>>>>>>>    Traversing through the keys
>>>>>>>>    3.
>>>>>>>>
>>>>>>>>    Data organization
>>>>>>>>
>>>>>>>>
>>>>>>>> *Key Schema :*
>>>>>>>>
>>>>>>>> The key schema implementation was carried out for following tables
>>>>>>>> of the current RDBMS database.
>>>>>>>>
>>>>>>>> [image: Screenshot from 2017-08-14 01-13-33.png]
>>>>>>>>
>>>>>>>> The key schema is mainly designed by analyzing implemented queries
>>>>>>>> for data retrieval and inserting in the RDBMS. The key schema for above
>>>>>>>> three tables is represented below table.
>>>>>>>>
>>>>>>>>
>>>>>>>> [image: Screenshot from 2017-08-15 02-11-24.png]
>>>>>>>>
>>>>>>>> *Key : Value*
>>>>>>>>
>>>>>>>> *Purpose*
>>>>>>>>
>>>>>>>> MESSAGE.$message_id.QUEUE_ID : queue_id
>>>>>>>>
>>>>>>>> Stores queue id of the message.
>>>>>>>>
>>>>>>>> MESSAGE.$message_id.DLC_QUEUE_ID : dlc_queue_id
>>>>>>>>
>>>>>>>> Stores dlc queue id of the message.
>>>>>>>>
>>>>>>>> MESSAGE.$message_id.MESSAGE_METADATA : message_metadata
>>>>>>>>
>>>>>>>> Stores metadata of the message.
>>>>>>>>
>>>>>>>> MESSAGE.$message_id.$content_offset.MESSAGE_CONTENT :
>>>>>>>> message_content
>>>>>>>>
>>>>>>>> Stores message content for a given message offset of the message.
>>>>>>>>
>>>>>>>> QUEUE.$queue_id.QUEUE_NAME : queue_name
>>>>>>>>
>>>>>>>> Stores name of the queue under the id.
>>>>>>>>
>>>>>>>> QUEUE.$queue_name.QUEUE_ID : queue_id
>>>>>>>>
>>>>>>>> Stores id of the queue under the name.
>>>>>>>>
>>>>>>>> QUEUE.$queue_name.message_id. MESSAGE_METADATA : message_metadata
>>>>>>>>
>>>>>>>> Stores metadata of the messages which belongs to the queue.
>>>>>>>>
>>>>>>>> LAST_MESSAGE_ID
>>>>>>>>
>>>>>>>> Stores last message id.
>>>>>>>>
>>>>>>>> LAST_QUEUE_ID
>>>>>>>>
>>>>>>>> Stores last queue id.
>>>>>>>>
>>>>>>>> As it can be seen some data repetition is higher when using this
>>>>>>>> schema. That is mainly due to one directional key-value mapping of 
>>>>>>>> LevelDB.
>>>>>>>> As an example two keys (QUEUE.$queue_id.QUEUE_NAME ,
>>>>>>>> QUEUE.$queue_name.QUEUE_ID) are required to build the
>>>>>>>> bidirectional relation (get queue name given queue id and get
>>>>>>>> queue id given queue name) between queue name and the queue id. As
>>>>>>>> LevelDB has better writing performances than RDBMS data repetition may 
>>>>>>>> not
>>>>>>>> be an much of an overhead in inserting data. Moreover batch operations 
>>>>>>>> can
>>>>>>>> be used in multiple insertions.
>>>>>>>>
>>>>>>>> The main purpose of using of prefixes like MESSAGE and QUEUE in
>>>>>>>> keys is to organize them properly. As LevelDB stores keys 
>>>>>>>> lexicographically
>>>>>>>> these prefixes will make sure that message related and queue related 
>>>>>>>> keys
>>>>>>>> are stored separately as displayed below. The following shows the keys 
>>>>>>>> of
>>>>>>>> the LevelDB store after publishing a JMS message to the broker. It can 
>>>>>>>> be
>>>>>>>> clearly seen that the keys are stored in lexicographical order.
>>>>>>>>
>>>>>>>> [image: Screenshot from 2017-08-14 19-57-13.png]
>>>>>>>>
>>>>>>>> Organize keys in such a manner also improves the efficiency of
>>>>>>>> traversing the keys using iterators when retrieving and deleting data. 
>>>>>>>> As
>>>>>>>> displayed in the diagram below, iterators traverse by starting from the
>>>>>>>> first stored key in the store. When iterator head reaches a key it can
>>>>>>>> either move to the next key or previous key. (similar to double
>>>>>>>> linked list) Hence storing related keys successively improves the
>>>>>>>> efficiency of traversing when retrieving and deleting data by reducing 
>>>>>>>> the
>>>>>>>> seeking time.
>>>>>>>>
>>>>>>>>
>>>>>>>> [image: Screenshot from 2017-08-15 02-11-40.png]
>>>>>>>>
>>>>>>>>
>>>>>>>> Basically these are the factors and decisions which have been taken
>>>>>>>> in implementing this key schema. And this schema should be extended to
>>>>>>>> provide functionalities like storing message expiration data etc. It 
>>>>>>>> would
>>>>>>>> be great to to have a feedback on the proposed schema specially 
>>>>>>>> regarding
>>>>>>>> how to reduce data repetition and improve efficiency furthermore.
>>>>>>>>
>>>>>>>> [1] https://github.com/fusesource/leveldbjni
>>>>>>>>
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>> --
>>>>>>>>
>>>>>>>> *Wishmitha Mendis*
>>>>>>>>
>>>>>>>> *Intern - Software Engineering*
>>>>>>>> *WSO2*
>>>>>>>>
>>>>>>>> *Mobile : +94 777577706 <077%20757%207706>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> _______________________________________________
>>>>>>>> Architecture mailing list
>>>>>>>> [email protected]
>>>>>>>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> _______________________________________________
>>>>>>> Architecture mailing list
>>>>>>> [email protected]
>>>>>>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *Wishmitha Mendis*
>>>>>>
>>>>>> *Intern - Software Engineering*
>>>>>> *WSO2*
>>>>>>
>>>>>> *Mobile : +94 777577706 <+94%2077%20757%207706>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> _______________________________________________
>>>>>> Architecture mailing list
>>>>>> [email protected]
>>>>>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Asanka Abeyweera
>>>>> Senior Software Engineer
>>>>> WSO2 Inc.
>>>>>
>>>>> Phone: +94 712228648 <+94%2071%20222%208648>
>>>>> Blog: a5anka.github.io
>>>>>
>>>>> <https://wso2.com/signature>
>>>>>
>>>>> _______________________________________________
>>>>> Architecture mailing list
>>>>> [email protected]
>>>>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> *Wishmitha Mendis*
>>>>
>>>> *Intern - Software Engineering*
>>>> *WSO2*
>>>>
>>>> *Mobile : +94 777577706 <+94%2077%20757%207706>*
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Asanka Abeyweera
>>> Senior Software Engineer
>>> WSO2 Inc.
>>>
>>> Phone: +94 712228648 <+94%2071%20222%208648>
>>> Blog: a5anka.github.io
>>>
>>> <https://wso2.com/signature>
>>>
>>> _______________________________________________
>>> Architecture mailing list
>>> [email protected]
>>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
>>>
>>>
>>
>>
>> --
>> *Hasitha Abeykoon*
>> Senior Software Engineer; WSO2, Inc.; http://wso2.com
>> *cell:* *+94 719363063*
>> *blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com>
>>
>>
>
>
> --
>
> *Wishmitha Mendis*
>
> *Intern - Software Engineering*
> *WSO2*
>
> *Mobile : +94 777577706 <+94%2077%20757%207706>*
>
>
>


-- 

*Wishmitha Mendis*

*Intern - Software Engineering*
*WSO2*

*Mobile : +94 777577706*
_______________________________________________
Architecture mailing list
[email protected]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture

Reply via email to