> On Sept. 24, 2012, 7:14 p.m., Alan Conway wrote: > > I also like the approach. You're correct that HA could replace it's use of > > eachMessage with a cursor, and it probably should do so anyway. > > > > I think you will need to consider the case where a single message takes up > > more than one page. Not just for very large messages, but for message > > streams with occasional large messages where most traffic is smaller - it > > may be sub-optimal to force the page size across the board just to cope > > with rare large messages.
I agree ultimately it would be nice to allow messages of any size. I was considering for an early access implementation to enforce a maximum message size (which would be the largest message that could fit in a single page), and then reject anything larger. However I don't think spanning pages is too difficult to achieve, though it does add some complexity. - Gordon ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/7209/#review11853 ----------------------------------------------------------- On Sept. 21, 2012, 3:14 p.m., Gordon Sim wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/7209/ > ----------------------------------------------------------- > > (Updated Sept. 21, 2012, 3:14 p.m.) > > > Review request for qpid, Kenneth Giusti and Ted Ross. > > > Description > ------- > > == The Problem == > > We want to be able to handle a large, growing queue without exhausting > the limited supply of memory. To do this we want to make use of the > filesystem. > > == Overview of Approach == > > My design proposal allows a queue to be configured as enabling > 'paging'. This option cannot be used in combination with LVQ or > priority queue options at present (due to the specific ordering > requirements of those options). > > A queue for which paging is enabled will be backed by a file. This > file will be logically split into fixed size 'pages'. Each page will > hold a contiguous sequence of messages within it. The corresponding > segment of the file for each page may be mapped into memory allowing > representations of the contained messages to be stored to-, and > recovered from-, disk. The recording of message representation to disk > for paging is entirely orthogonal to any persistence of the data for > recovery purposes. This allows the encoded form to be much simpler > since we don't need to consider recovery after broker failure. > > The queue is thus comprised of a sequence of pages. Only a fixed > number of pages need be loaded at any given time. This frees the > broker from having to store all the messages in memory. When a message > from an unloaded page is required, that page can be reloaded. This may > necessitate unloading some other page to stay within the allowed > number of loaded pages. > > New pages can be created as needed, extending the file without > explicit limit (obviously the filesystem has some finite limit). The > sequence of pages that make up the queue need not match the sequence > of segments within the backing file. This means that pages can be > reused when they are emptied of all messages. > > == The Design == > > A specific Messages implementation is used to implement a queue > supporting paging in the manner desctibed above. > > On a posix system it relies on mmap/munmap/msync for the mapping of > the file into memory. This will (eventually) be abstracted behind an > abstraction allowing platforms that don't support those calls to > supply alternative implementations. > > The central structure in the paged queue is a map of Page instances, > keyed by a sequence number. The key represents the sequence of the > first message contained by the page. > > All pages are the same size. Each corresponds to a particular offset > in the file. A Page instance can be in the loaded or unloaded > state. When loaded, the messages it contains are held in a standard > deque from which they can be returned as needed. When loaded, the > segment in the file it is backed by will be mapped into a particular > region in memory. > > To add messages to a page it must be loaded. When a messages is added, > it is pushed onto the dequeue and also encoded into the region of > memory to which the file segment it represents is mapped. > > A page also contains two sequence sets. One tracks all the messages > that are enqueued within the page, the other all the messages which > have been acquired (the latter is a strict subset of the > former). These sequence sets are always in memory. This means each > enqueued message will be tracked in memory and thus the memory will > grow as the queue grows. However the maximum memory required per > message in the unloaded state is two sequence ranges (assuming both > the enqueued set and acquired set are sparse and the message is > recorded in both). In general it is anticipated the memory used will > be even less than this. Of course additionally there is the memory > overhead of the map of pages which will grow as the queue grows even > though not all these pages are in the loaded state. Of course the > expectation is that the saving in memory by having most of the pages > in a large queue in the unloaded state, in which they do not hold the > actual messages, but merely the two sequence sets mentioned above, is > significant. > > Having the acquired state held in sequence sets avoids having to > update the file every time a messages state changes. The state of the > message instances can be set based on the sequence sets when the page > is loaded. The sequence sets are also currently updated based on the > message states when the page is unloaded (this is because at present > it is the MessageDistributor that sets the state to acquired, and that > is not done via the Messages instance - that maybe worth changing). > > When a subscriber moves through a queue (Messages::next()) the > QueueCursor tracks its poisition. In a paged queue, we can find page > the next message is in by consulting the map of pages. A message at a > given sequence will be in the last page with a key lower or equal to > that sequence number. That page can then be loaded if necessary, and > the message instance within the deque found and returned. The location > of messages for releasing, deleting etc can be done in a similar > manner. > > == Limitations and Remaining Work == > > This prototype does not handle the case where a message is large than > a page. The page size is currently that reprted by the system (it > needs to be a multiple of this for mmap, but at present the > multiplying factor is always 1). > > The selection of a currently loaded page to be 'swapped out' to allow > another page to be loaded is currently very crude and unlikely to be > optimal in many cases. Some refinement of this would be necessary, > likely based on hints in terms of weightings to pages based on past > use (which is a reasonable indicator of likelihood of future use). > > The Messages::foreach() method is not implemented. This is currently > I think only used by HA on syncing a backup and I actually think it > could be removed and replaced with a normal cursor based iteration > through the queue (which would also allow the 'visibility' of messages > to be configured (i.e. whether you see acquired messages or not). > > Also required are a suite of tests to fully exercise the queue and to > explore the memroy and performance characteristics in different > scenarios in order to determine its usefulness and indicate what sorts > of enhancements might be needed. > > > This addresses bug QPID-4339. > https://issues.apache.org/jira/browse/QPID-4339 > > > Diffs > ----- > > /trunk/qpid/cpp/src/Makefile.am 1388256 > /trunk/qpid/cpp/src/qpid/broker/PagedQueue.h PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/PagedQueue.cpp PRE-CREATION > /trunk/qpid/cpp/src/qpid/broker/QueueCursor.h 1388256 > /trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp 1388256 > /trunk/qpid/cpp/src/qpid/broker/QueueSettings.h 1388256 > /trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp 1388256 > /trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h 1388256 > /trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp 1388256 > > Diff: https://reviews.apache.org/r/7209/diff/ > > > Testing > ------- > > make check passes (but this patch doesn't yet add any tests to it). I have > tested with qpid-cpp-benchmark, qpid-send etc to get some basic confidence in > the design. It does radically reduce the required memory. Use > qpid.paging=True to enable; use qpid.max_pages to configure the number of > active pages (page size will be more configurable in an updated version). > > > Thanks, > > Gordon Sim > >
