Sorry about that, I decided to change the thread id to its name and
did not change all the references.
Should be OK now.

Going back to the original encoding issue: I have tried and failed to
reproduce it.

Can you find out which mbox caused the problem so I can take a look?


On 22 November 2016 at 07:23, Francesco Chicchiriccò
<[email protected]> wrote:
> Hi all,
> after latest commits, I get now the following error when importing from
> mbox:
>
> Exception in thread Thread-1:
> Traceback (most recent call last):
>   File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
>     self.run()
>   File "import-mbox.py", line 314, in run
>     bulk.assign(self.id, ja, es, 'mbox')
> AttributeError: 'SlurpThread' object has no attribute 'id'
>
> Regards.
>
>
> On 21/11/2016 17:19, sebb wrote:
>>
>> On 21 November 2016 at 11:52, Daniel Gruno <[email protected]> wrote:
>>>
>>> On 11/21/2016 12:50 PM, sebb wrote:
>>>>
>>>> On 21 November 2016 at 11:40, Francesco Chicchiriccò
>>>> <[email protected]> wrote:
>>>>>
>>>>> Hi all,
>>>>> not sure but it seems that the commit below broke my scheduled import
>>>>> from mbox:
>>>>
>>>> It won't be that commit, most likely the fix for #251
>>>>
>>>>
>>>> https://github.com/apache/incubator-ponymail/commit/1a3bff403166c917738fd02acefc988b909d4eae#diff-0102373f79eaa72ffaff3ce7675b6a43
>>>>
>>>> This presumably means the archiver would have fallen over with the same
>>>> e-mail.
>>>> Or there is an encoding problem with writing the mail to the mbox - or
>>>> reading it - so the importer is not seeing the same input as the
>>>> archiver.
>>>
>>> The importer usually sees things as ASCII, whereas the archiver _can_
>>> get fed input as unicode by postfix (I don't know why, but there it is).
>>> This may explain why. I think as_bytes is a safer way to archive, as
>>> it's binary.
>>
>> That all depends how the binary is generated.
>> As far as I can tell, the parsed message is not stored as binary, so
>> it has to be encoded to create the bytes.
>>
>>>> It would be useful to know what the message is that causes the issue.
>>>>
>>>> If you can find it I can take a look later.
>>>>
>>>>> Exception in thread Thread-1:
>>>>> Traceback (most recent call last):
>>>>>    File "/usr/lib/python3.5/threading.py", line 914, in
>>>>> _bootstrap_inner
>>>>>      self.run()
>>>>>    File "import-mbox.py", line 297, in run
>>>>>      'source': message.as_string()
>>>>>    File "/usr/lib/python3.5/email/message.py", line 159, in as_string
>>>>>      g.flatten(self, unixfrom=unixfrom)
>>>>>    File "/usr/lib/python3.5/email/generator.py", line 115, in flatten
>>>>>      self._write(msg)
>>>>>    File "/usr/lib/python3.5/email/generator.py", line 181, in _write
>>>>>      self._dispatch(msg)
>>>>>    File "/usr/lib/python3.5/email/generator.py", line 214, in _dispatch
>>>>>      meth(msg)
>>>>>    File "/usr/lib/python3.5/email/generator.py", line 243, in
>>>>> _handle_text
>>>>>      msg.set_payload(payload, charset)
>>>>>    File "/usr/lib/python3.5/email/message.py", line 316, in set_payload
>>>>>      payload = payload.encode(charset.output_charset)
>>>>> UnicodeEncodeError: 'ascii' codec can't encode character '\ufffd' in
>>>>> position 3657: ordinal not in range(128)
>>>>>
>>>>> Any hint / workaround?
>>>>>
>>>>> On 2016-11-21 00:20 (+0100), [email protected] wrote:
>>>>>>
>>>>>> Repository: incubator-ponymail
>>>>>> Updated Branches:
>>>>>>    refs/heads/master 1a3bff403 -> af1544e7b
>>>>>>
>>>>>>
>>>>>> import-mbox.py messages need the thread number
>>>>>>
>>>>>> This fixes #248
>>>>>>
>>>>>> Project:
>>>>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/repo
>>>>>> Commit:
>>>>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/commit/af1544e7
>>>>>> Tree:
>>>>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/tree/af1544e7
>>>>>> Diff:
>>>>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/diff/af1544e7
>>>>>>
>>>>>> Branch: refs/heads/master
>>>>>> Commit: af1544e7b63d81a5998a4b3a1471586d63d72a4e
>>>>>> Parents: 1a3bff4
>>>>>> Author: Sebb <[email protected]>
>>>>>> Authored: Sun Nov 20 23:19:55 2016 +0000
>>>>>> Committer: Sebb <[email protected]>
>>>>>> Committed: Sun Nov 20 23:19:55 2016 +0000
>>>>>>
>>>>>> ----------------------------------------------------------------------
>>>>>>   tools/import-mbox.py | 59
>>>>>> +++++++++++++++++++++++++++--------------------
>>>>>>   1 file changed, 34 insertions(+), 25 deletions(-)
>>>>>> ----------------------------------------------------------------------
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/blob/af1544e7/tools/import-mbox.py
>>>>>> ----------------------------------------------------------------------
>>>>>> diff --git a/tools/import-mbox.py b/tools/import-mbox.py
>>>>>> index 15f09ad..12bc0d1 100755
>>>>>> --- a/tools/import-mbox.py
>>>>>> +++ b/tools/import-mbox.py
>>>>>> @@ -107,7 +107,9 @@ es = Elasticsearch([
>>>>>>   rootURL = ""
>>>>>>
>>>>>>   class BulkThread(Thread):
>>>>>> -    def assign(self, json, xes, dtype = 'mbox', wc = 'quorum'):
>>>>>> +
>>>>>> +    def assign(self, id, json, xes, dtype = 'mbox', wc = 'quorum'):
>>>>>> +        self.id = id
>>>>>>           self.json = json
>>>>>>           self.xes = xes
>>>>>>           self.dtype = dtype
>>>>>> @@ -133,17 +135,24 @@ class BulkThread(Thread):
>>>>>>           try:
>>>>>>               helpers.bulk(self.xes, js_arr)
>>>>>>           except Exception as err:
>>>>>> -            print("Warning: Could not bulk insert: %s" % err)
>>>>>> -        #print("Inserted %u entries" % len(js_arr))
>>>>>> +            print("%d: Warning: Could not bulk insert: %s into %s" %
>>>>>> (self.id,err,self.dtype))
>>>>>> +#         print("%d: Inserted %u entries into %s" % (self.id,
>>>>>> len(js_arr),self.dtype))
>>>>>>
>>>>>>
>>>>>>   class SlurpThread(Thread):
>>>>>>
>>>>>> +    def __init__(self, index):
>>>>>> +        self.id = index
>>>>>> +        super(SlurpThread, self).__init__()
>>>>>> +
>>>>>> +    def printid(self,message):
>>>>>> +        print("%d: %s" % (self.id, message))
>>>>>> +
>>>>>>       def run(self):
>>>>>>           global block, y, es, lists, baddies, config, resendTo,
>>>>>> timeout, dedupped, dedup
>>>>>>           ja = []
>>>>>>           jas = []
>>>>>> -        print("Thread started")
>>>>>> +        self.printid("Thread started")
>>>>>>           mla = None
>>>>>>           ml = ""
>>>>>>           mboxfile = ""
>>>>>> @@ -152,16 +161,16 @@ class SlurpThread(Thread):
>>>>>>           archie = archiver.Archiver(parseHTML = parseHTML)
>>>>>>
>>>>>>           while len(lists) > 0:
>>>>>> -            print("%u elements left to slurp" % len(lists))
>>>>>> +            self.printid("%u elements left to slurp" % len(lists))
>>>>>>
>>>>>>               block.acquire()
>>>>>>               try:
>>>>>>                   mla = lists.pop(0)
>>>>>>                   if not mla:
>>>>>> -                    print("Nothing more to do here")
>>>>>> +                    self.printid("Nothing more to do here")
>>>>>>                       return
>>>>>>               except Exception as err:
>>>>>> -                print("Could not pop list: %s" % err)
>>>>>> +                self.printid("Could not pop list: %s" % err)
>>>>>>                   return
>>>>>>               finally:
>>>>>>                   block.release()
>>>>>> @@ -184,7 +193,7 @@ class SlurpThread(Thread):
>>>>>>                   tmpname = mla[0]
>>>>>>                   filename = mla[0]
>>>>>>                   if filename.find(".gz") != -1:
>>>>>> -                    print("Decompressing %s..." % filename)
>>>>>> +                    self.printid("Decompressing %s..." % filename)
>>>>>>                       try:
>>>>>>                           with open(filename, "rb") as bf:
>>>>>>                               bmd = bf.read()
>>>>>> @@ -197,16 +206,16 @@ class SlurpThread(Thread):
>>>>>>                               tmpname = tmpfile.name
>>>>>>                               filename = tmpname
>>>>>>                               dFile = True # Slated for deletion upon
>>>>>> having been read
>>>>>> -                            print("%s -> %u bytes" % (tmpname,
>>>>>> len(bmd)))
>>>>>> +                            self.printid("%s -> %u bytes" % (tmpname,
>>>>>> len(bmd)))
>>>>>>                       except Exception as err:
>>>>>> -                        print("This wasn't a gzip file: %s" % err )
>>>>>> -                print("Slurping %s" % filename)
>>>>>> +                        self.printid("This wasn't a gzip file: %s" %
>>>>>> err )
>>>>>> +                self.printid("Slurping %s" % filename)
>>>>>>                   messages = mailbox.mbox(tmpname)
>>>>>>
>>>>>>               else:
>>>>>>                   ml = mla[0]
>>>>>>                   mboxfile = mla[1]
>>>>>> -                print("Slurping %s/%s" % (ml, mboxfile))
>>>>>> +                self.printid("Slurping %s/%s" % (ml, mboxfile))
>>>>>>                   m = re.match(r"(\d\d\d\d)(\d\d)", mboxfile)
>>>>>>                   EY = 1997
>>>>>>                   EM = 1
>>>>>> @@ -232,7 +241,7 @@ class SlurpThread(Thread):
>>>>>>                   if fromFilter and 'from' in message and
>>>>>> message['from'].find(fromFilter) == -1:
>>>>>>                       continue
>>>>>>                   if resendTo:
>>>>>> -                    print("Delivering message %s via MTA" %
>>>>>> message['message-id'] if 'message-id' in message else '??')
>>>>>> +                    self.printid("Delivering message %s via MTA" %
>>>>>> message['message-id'] if 'message-id' in message else '??')
>>>>>>                       s = SMTP('localhost')
>>>>>>                       try:
>>>>>>                           if list_override:
>>>>>> @@ -245,7 +254,7 @@ class SlurpThread(Thread):
>>>>>>                       s.send_message(message, from_addr=None,
>>>>>> to_addrs=(resendTo))
>>>>>>                       continue
>>>>>>                   if (time.time() - stime > timeout): # break out
>>>>>> after N seconds, it shouldn't take this long..!
>>>>>> -                    print("Whoa, this is taking way too long,
>>>>>> ignoring %s for now" % tmpname)
>>>>>> +                    self.printid("Whoa, this is taking way too long,
>>>>>> ignoring %s for now" % tmpname)
>>>>>>                       break
>>>>>>
>>>>>>                   json, contents =
>>>>>> archie.compute_updates(list_override, private, message)
>>>>>> @@ -271,7 +280,7 @@ class SlurpThread(Thread):
>>>>>>                           }
>>>>>>                       )
>>>>>>                       if res and len(res['hits']['hits']) > 0:
>>>>>> -                        print("Dedupping %s" % json['message-id'])
>>>>>> +                        self.printid("Dedupping %s" %
>>>>>> json['message-id'])
>>>>>>                           dedupped += 1
>>>>>>                           continue
>>>>>>
>>>>>> @@ -305,43 +314,43 @@ class SlurpThread(Thread):
>>>>>>                       if len(ja) >= 40:
>>>>>>                           if not args.dry:
>>>>>>                               bulk = BulkThread()
>>>>>> -                            bulk.assign(ja, es, 'mbox')
>>>>>> +                            bulk.assign(self.id, ja, es, 'mbox')
>>>>>>                               bulk.insert()
>>>>>>                           ja = []
>>>>>>
>>>>>>                           if not args.dry:
>>>>>>                               bulks = BulkThread()
>>>>>> -                            bulks.assign(jas, es, 'mbox_source')
>>>>>> +                            bulks.assign(self.id, jas, es,
>>>>>> 'mbox_source')
>>>>>>                               bulks.insert()
>>>>>>                           jas = []
>>>>>>                   else:
>>>>>> -                    print("Failed to parse: Return=%s Message-Id=%s"
>>>>>> % (message.get('Return-Path'), message.get('Message-Id')))
>>>>>> +                    self.printid("Failed to parse: Return=%s
>>>>>> Message-Id=%s" % (message.get('Return-Path'), message.get('Message-Id')))
>>>>>>                       bad += 1
>>>>>>
>>>>>>               if filebased:
>>>>>> -                print("Parsed %u records (failed: %u) from %s" %
>>>>>> (count, bad, filename))
>>>>>> +                self.printid("Parsed %u records (failed: %u) from %s"
>>>>>> % (count, bad, filename))
>>>>>>                   if dFile:
>>>>>>                       os.unlink(tmpname)
>>>>>>               elif imap:
>>>>>> -                print("Parsed %u records (failed: %u) from imap" %
>>>>>> (count, bad))
>>>>>> +                self.printid("Parsed %u records (failed: %u) from
>>>>>> imap" % (count, bad))
>>>>>>               else:
>>>>>> -                print("Parsed %s/%s: %u records (failed: %u) from %s"
>>>>>> % (ml, mboxfile, count, bad, tmpname))
>>>>>> +                self.printid("Parsed %s/%s: %u records (failed: %u)
>>>>>> from %s" % (ml, mboxfile, count, bad, tmpname))
>>>>>>                   os.unlink(tmpname)
>>>>>>
>>>>>>               y += count
>>>>>>               baddies += bad
>>>>>>               if not args.dry:
>>>>>>                   bulk = BulkThread()
>>>>>> -                bulk.assign(ja, es, 'mbox')
>>>>>> +                bulk.assign(self.id, ja, es, 'mbox')
>>>>>>                   bulk.insert()
>>>>>>               ja = []
>>>>>>
>>>>>>               if not args.dry:
>>>>>>                   bulks = BulkThread()
>>>>>> -                bulks.assign(jas, es, 'mbox_source')
>>>>>> +                bulks.assign(self.id, jas, es, 'mbox_source')
>>>>>>                   bulks.insert()
>>>>>>               jas = []
>>>>>> -        print("Done, %u elements left to slurp" % len(lists))
>>>>>> +        self.printid("Done, %u elements left to slurp" % len(lists))
>>>>>>
>>>>>>   parser = argparse.ArgumentParser(description='Command line
>>>>>> options.')
>>>>>>   parser.add_argument('--source', dest='source', type=str, nargs=1,
>>>>>> @@ -637,7 +646,7 @@ threads = []
>>>>>>   cc = min(len(lists), int( multiprocessing.cpu_count() / 2) + 1)
>>>>>>   print("Starting up to %u threads to fetch the %u %s lists" % (cc,
>>>>>> len(lists), project))
>>>>>>   for i in range(1,cc+1):
>>>>>> -    t = SlurpThread()
>>>>>> +    t = SlurpThread(i)
>>>>>>       threads.append(t)
>>>>>>       t.start()
>>>>>>       print("Started no. %u" % i)
>>>>>>
>>>>>>
>
>
>
> --
> Francesco Chicchiriccò
>
> Tirasa - Open Source Excellence
> http://www.tirasa.net/
>
> Member at The Apache Software Foundation
> Syncope, Cocoon, Olingo, CXF, OpenJPA, PonyMail
> http://home.apache.org/~ilgrosso/
>

Reply via email to