On Mon, 13 Dec 2021 at 00:22, sebb <[email protected]> wrote:
>
> On Sun, 12 Dec 2021 at 17:16, sebb <[email protected]> wrote:
> >
> > On Sun, 12 Dec 2021 at 15:25, Daniel Gruno <[email protected]> wrote:
> > >
> > > I get that this could save some megabytes of memory, but what is the
> > > speed implication here?
> >
> > AFAICT no implication unless yielding is inefficient.
> >
> > > If every document is fetched on a one-by-one basis
> >
> > The source documents were anyway fetched singly.
> > This change only affects the mbox data.
>
> On looking again at the original code:
>
> messages.query calls session.database.scan repeatedly, and builds up
> the list of responses up to the max.
> Note that session.database.scan returns one hit at a time, not the
> full list of hits from each scroll batch.
>
> The change I made allows direct access to the validated hits, one at a
> time, rather than collecting them all (up to the max).
> It has added one level of yield indirection. Does this really cause a 
> slowdown?
>
> It might make sense for session.database.scan to return the list of
> hits from each scroll batch, but that is a different issue.

FTR, that has now been implemented.

mbox.py now processes batches of responses rather than getting a
single response with all mails.

> > >, how big of a slowdown will we see for large data sets over the
> > > wire?
> >
> > Unless yield is inefficient it should make little difference.
> >
> > > Maybe this is better suited as a configurable option for the
> > > back-end so people can favor high-mem fast ops versus low-mem slow ops?
> >
> > If it turns out to be inefficient, then an adjustment might be necessary.
> >
> > For example, yield all the visible mails from a batch at once, rather
> > than individually.
> >
> > > On 12/12/2021 13.28, [email protected] wrote:
> > > > This is an automated email from the ASF dual-hosted git repository.
> > > >
> > > > sebb pushed a commit to branch master
> > > > in repository 
> > > > https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git
> > > >
> > > > commit 8c48b5b70c4dc5d0831bfde9e1e8e53129c6aa34
> > > > Author: Sebb <[email protected]>
> > > > AuthorDate: Sun Dec 12 12:26:01 2021 +0000
> > > >
> > > >      mbox.py is inefficient for large mailboxes
> > > >
> > > >      This fixes #172
> > > > ---
> > > >   server/endpoints/mbox.py   | 15 +++++++--------
> > > >   server/plugins/messages.py | 40 
> > > > ++++++++++++++++++++++++++++++++--------
> > > >   2 files changed, 39 insertions(+), 16 deletions(-)
> > > >
> > > > diff --git a/server/endpoints/mbox.py b/server/endpoints/mbox.py
> > > > index fb6ee2b..8b34362 100644
> > > > --- a/server/endpoints/mbox.py
> > > > +++ b/server/endpoints/mbox.py
> > > > @@ -86,13 +86,6 @@ async def process(
> > > >           return aiohttp.web.Response(headers={"content-type": 
> > > > "text/plain",}, status=400, text=str(ve))
> > > >       except AssertionError as ae:  # If defuzzer encounters internal 
> > > > errors, it will throw an AssertionError
> > > >           return aiohttp.web.Response(headers={"content-type": 
> > > > "text/plain",}, status=500, text=str(ae))
> > > > -    results = await plugins.messages.query(
> > > > -        session,
> > > > -        query_defuzzed,
> > > > -        query_limit=server.config.database.max_hits,
> > > > -        metadata_only=True,
> > > > -        epoch_order="asc"
> > > > -    )
> > > >
> > > >       dlstem = f"{lid}_{domain}"
> > > >       if yyyymm:
> > > > @@ -109,7 +102,13 @@ async def process(
> > > >       response = aiohttp.web.StreamResponse(status=200, headers=headers)
> > > >       response.enable_chunked_encoding()
> > > >       await response.prepare(request)
> > > > -    for email in results:
> > > > +
> > > > +    async for email in plugins.messages.query_each(
> > > > +        session,
> > > > +        query_defuzzed,
> > > > +        metadata_only=True,
> > > > +        epoch_order="asc"
> > > > +    ):
> > > >           mboxrd_source = await convert_source(session, email)
> > > >           # Ensure each non-empty source ends with a blank line
> > > >           if not mboxrd_source.endswith("\n\n"):
> > > > diff --git a/server/plugins/messages.py b/server/plugins/messages.py
> > > > index f6abcba..47ca7d7 100644
> > > > --- a/server/plugins/messages.py
> > > > +++ b/server/plugins/messages.py
> > > > @@ -316,10 +316,9 @@ async def get_source(session: 
> > > > plugins.session.SessionObject, permalink: str = No
> > > >       return None
> > > >
> > > >
> > > > -async def query(
> > > > +async def query_each(
> > > >       session: plugins.session.SessionObject,
> > > >       query_defuzzed,
> > > > -    query_limit=10000,
> > > >       hide_deleted=True,
> > > >       metadata_only=False,
> > > >       epoch_order="desc",
> > > > @@ -328,9 +327,8 @@ async def query(
> > > >       """
> > > >       Advanced query and grab for stats.py
> > > >       Also called by mbox.py (using metadata_only=True)
> > > > +    Yields results singly
> > > >       """
> > > > -    docs = []
> > > > -    hits = 0
> > > >       assert session.database, DATABASE_NOT_CONNECTED
> > > >       preserve_order = True if epoch_order == "asc" else False
> > > >       es_query = {
> > > > @@ -378,10 +376,36 @@ async def query(
> > > >                   for hdr in MUST_HAVE:
> > > >                       if not hdr in source_fields and hdr in doc:
> > > >                           del doc[hdr]
> > > > -            docs.append(doc)
> > > > -            hits += 1
> > > > -            if hits > query_limit:
> > > > -                break
> > > > +            yield doc
> > > > +
> > > > +
> > > > +async def query(
> > > > +    session: plugins.session.SessionObject,
> > > > +    query_defuzzed,
> > > > +    query_limit=10000,
> > > > +    hide_deleted=True,
> > > > +    metadata_only=False,
> > > > +    epoch_order="desc",
> > > > +    source_fields=None
> > > > +):
> > > > +    """
> > > > +    Advanced query and grab for stats.py
> > > > +    Also called by mbox.py (using metadata_only=True)
> > > > +    """
> > > > +    docs = []
> > > > +    hits = 0
> > > > +    async for doc in query_each(
> > > > +        session,
> > > > +        query_defuzzed,
> > > > +        hide_deleted=hide_deleted,
> > > > +        metadata_only=metadata_only,
> > > > +        epoch_order=epoch_order,
> > > > +        source_fields=source_fields
> > > > +    ):
> > > > +        docs.append(doc)
> > > > +        hits += 1
> > > > +        if hits > query_limit:
> > > > +            break
> > > >       return docs
> > > >
> > > >
> > >

Reply via email to