On 21 November 2016 at 11:40, Francesco Chicchiriccò
<[email protected]> wrote:
> Hi all,
> not sure but it seems that the commit below broke my scheduled import from 
> mbox:

It won't be that commit, most likely the fix for #251

https://github.com/apache/incubator-ponymail/commit/1a3bff403166c917738fd02acefc988b909d4eae#diff-0102373f79eaa72ffaff3ce7675b6a43

This presumably means the archiver would have fallen over with the same e-mail.
Or there is an encoding problem with writing the mail to the mbox - or
reading it - so the importer is not seeing the same input as the
archiver.

It would be useful to know what the message is that causes the issue.

If you can find it I can take a look later.

> Exception in thread Thread-1:
> Traceback (most recent call last):
>   File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
>     self.run()
>   File "import-mbox.py", line 297, in run
>     'source': message.as_string()
>   File "/usr/lib/python3.5/email/message.py", line 159, in as_string
>     g.flatten(self, unixfrom=unixfrom)
>   File "/usr/lib/python3.5/email/generator.py", line 115, in flatten
>     self._write(msg)
>   File "/usr/lib/python3.5/email/generator.py", line 181, in _write
>     self._dispatch(msg)
>   File "/usr/lib/python3.5/email/generator.py", line 214, in _dispatch
>     meth(msg)
>   File "/usr/lib/python3.5/email/generator.py", line 243, in _handle_text
>     msg.set_payload(payload, charset)
>   File "/usr/lib/python3.5/email/message.py", line 316, in set_payload
>     payload = payload.encode(charset.output_charset)
> UnicodeEncodeError: 'ascii' codec can't encode character '\ufffd' in position 
> 3657: ordinal not in range(128)
>
> Any hint / workaround?
>
> On 2016-11-21 00:20 (+0100), [email protected] wrote:
>> Repository: incubator-ponymail
>> Updated Branches:
>>   refs/heads/master 1a3bff403 -> af1544e7b
>>
>>
>> import-mbox.py messages need the thread number
>>
>> This fixes #248
>>
>> Project: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/repo
>> Commit: 
>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/commit/af1544e7
>> Tree: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/tree/af1544e7
>> Diff: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/diff/af1544e7
>>
>> Branch: refs/heads/master
>> Commit: af1544e7b63d81a5998a4b3a1471586d63d72a4e
>> Parents: 1a3bff4
>> Author: Sebb <[email protected]>
>> Authored: Sun Nov 20 23:19:55 2016 +0000
>> Committer: Sebb <[email protected]>
>> Committed: Sun Nov 20 23:19:55 2016 +0000
>>
>> ----------------------------------------------------------------------
>>  tools/import-mbox.py | 59 +++++++++++++++++++++++++++--------------------
>>  1 file changed, 34 insertions(+), 25 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/blob/af1544e7/tools/import-mbox.py
>> ----------------------------------------------------------------------
>> diff --git a/tools/import-mbox.py b/tools/import-mbox.py
>> index 15f09ad..12bc0d1 100755
>> --- a/tools/import-mbox.py
>> +++ b/tools/import-mbox.py
>> @@ -107,7 +107,9 @@ es = Elasticsearch([
>>  rootURL = ""
>>
>>  class BulkThread(Thread):
>> -    def assign(self, json, xes, dtype = 'mbox', wc = 'quorum'):
>> +
>> +    def assign(self, id, json, xes, dtype = 'mbox', wc = 'quorum'):
>> +        self.id = id
>>          self.json = json
>>          self.xes = xes
>>          self.dtype = dtype
>> @@ -133,17 +135,24 @@ class BulkThread(Thread):
>>          try:
>>              helpers.bulk(self.xes, js_arr)
>>          except Exception as err:
>> -            print("Warning: Could not bulk insert: %s" % err)
>> -        #print("Inserted %u entries" % len(js_arr))
>> +            print("%d: Warning: Could not bulk insert: %s into %s" % 
>> (self.id,err,self.dtype))
>> +#         print("%d: Inserted %u entries into %s" % (self.id, 
>> len(js_arr),self.dtype))
>>
>>
>>  class SlurpThread(Thread):
>>
>> +    def __init__(self, index):
>> +        self.id = index
>> +        super(SlurpThread, self).__init__()
>> +
>> +    def printid(self,message):
>> +        print("%d: %s" % (self.id, message))
>> +
>>      def run(self):
>>          global block, y, es, lists, baddies, config, resendTo, timeout, 
>> dedupped, dedup
>>          ja = []
>>          jas = []
>> -        print("Thread started")
>> +        self.printid("Thread started")
>>          mla = None
>>          ml = ""
>>          mboxfile = ""
>> @@ -152,16 +161,16 @@ class SlurpThread(Thread):
>>          archie = archiver.Archiver(parseHTML = parseHTML)
>>
>>          while len(lists) > 0:
>> -            print("%u elements left to slurp" % len(lists))
>> +            self.printid("%u elements left to slurp" % len(lists))
>>
>>              block.acquire()
>>              try:
>>                  mla = lists.pop(0)
>>                  if not mla:
>> -                    print("Nothing more to do here")
>> +                    self.printid("Nothing more to do here")
>>                      return
>>              except Exception as err:
>> -                print("Could not pop list: %s" % err)
>> +                self.printid("Could not pop list: %s" % err)
>>                  return
>>              finally:
>>                  block.release()
>> @@ -184,7 +193,7 @@ class SlurpThread(Thread):
>>                  tmpname = mla[0]
>>                  filename = mla[0]
>>                  if filename.find(".gz") != -1:
>> -                    print("Decompressing %s..." % filename)
>> +                    self.printid("Decompressing %s..." % filename)
>>                      try:
>>                          with open(filename, "rb") as bf:
>>                              bmd = bf.read()
>> @@ -197,16 +206,16 @@ class SlurpThread(Thread):
>>                              tmpname = tmpfile.name
>>                              filename = tmpname
>>                              dFile = True # Slated for deletion upon having 
>> been read
>> -                            print("%s -> %u bytes" % (tmpname, len(bmd)))
>> +                            self.printid("%s -> %u bytes" % (tmpname, 
>> len(bmd)))
>>                      except Exception as err:
>> -                        print("This wasn't a gzip file: %s" % err )
>> -                print("Slurping %s" % filename)
>> +                        self.printid("This wasn't a gzip file: %s" % err )
>> +                self.printid("Slurping %s" % filename)
>>                  messages = mailbox.mbox(tmpname)
>>
>>              else:
>>                  ml = mla[0]
>>                  mboxfile = mla[1]
>> -                print("Slurping %s/%s" % (ml, mboxfile))
>> +                self.printid("Slurping %s/%s" % (ml, mboxfile))
>>                  m = re.match(r"(\d\d\d\d)(\d\d)", mboxfile)
>>                  EY = 1997
>>                  EM = 1
>> @@ -232,7 +241,7 @@ class SlurpThread(Thread):
>>                  if fromFilter and 'from' in message and 
>> message['from'].find(fromFilter) == -1:
>>                      continue
>>                  if resendTo:
>> -                    print("Delivering message %s via MTA" % 
>> message['message-id'] if 'message-id' in message else '??')
>> +                    self.printid("Delivering message %s via MTA" % 
>> message['message-id'] if 'message-id' in message else '??')
>>                      s = SMTP('localhost')
>>                      try:
>>                          if list_override:
>> @@ -245,7 +254,7 @@ class SlurpThread(Thread):
>>                      s.send_message(message, from_addr=None, 
>> to_addrs=(resendTo))
>>                      continue
>>                  if (time.time() - stime > timeout): # break out after N 
>> seconds, it shouldn't take this long..!
>> -                    print("Whoa, this is taking way too long, ignoring %s 
>> for now" % tmpname)
>> +                    self.printid("Whoa, this is taking way too long, 
>> ignoring %s for now" % tmpname)
>>                      break
>>
>>                  json, contents = archie.compute_updates(list_override, 
>> private, message)
>> @@ -271,7 +280,7 @@ class SlurpThread(Thread):
>>                          }
>>                      )
>>                      if res and len(res['hits']['hits']) > 0:
>> -                        print("Dedupping %s" % json['message-id'])
>> +                        self.printid("Dedupping %s" % json['message-id'])
>>                          dedupped += 1
>>                          continue
>>
>> @@ -305,43 +314,43 @@ class SlurpThread(Thread):
>>                      if len(ja) >= 40:
>>                          if not args.dry:
>>                              bulk = BulkThread()
>> -                            bulk.assign(ja, es, 'mbox')
>> +                            bulk.assign(self.id, ja, es, 'mbox')
>>                              bulk.insert()
>>                          ja = []
>>
>>                          if not args.dry:
>>                              bulks = BulkThread()
>> -                            bulks.assign(jas, es, 'mbox_source')
>> +                            bulks.assign(self.id, jas, es, 'mbox_source')
>>                              bulks.insert()
>>                          jas = []
>>                  else:
>> -                    print("Failed to parse: Return=%s Message-Id=%s" % 
>> (message.get('Return-Path'), message.get('Message-Id')))
>> +                    self.printid("Failed to parse: Return=%s Message-Id=%s" 
>> % (message.get('Return-Path'), message.get('Message-Id')))
>>                      bad += 1
>>
>>              if filebased:
>> -                print("Parsed %u records (failed: %u) from %s" % (count, 
>> bad, filename))
>> +                self.printid("Parsed %u records (failed: %u) from %s" % 
>> (count, bad, filename))
>>                  if dFile:
>>                      os.unlink(tmpname)
>>              elif imap:
>> -                print("Parsed %u records (failed: %u) from imap" % (count, 
>> bad))
>> +                self.printid("Parsed %u records (failed: %u) from imap" % 
>> (count, bad))
>>              else:
>> -                print("Parsed %s/%s: %u records (failed: %u) from %s" % 
>> (ml, mboxfile, count, bad, tmpname))
>> +                self.printid("Parsed %s/%s: %u records (failed: %u) from 
>> %s" % (ml, mboxfile, count, bad, tmpname))
>>                  os.unlink(tmpname)
>>
>>              y += count
>>              baddies += bad
>>              if not args.dry:
>>                  bulk = BulkThread()
>> -                bulk.assign(ja, es, 'mbox')
>> +                bulk.assign(self.id, ja, es, 'mbox')
>>                  bulk.insert()
>>              ja = []
>>
>>              if not args.dry:
>>                  bulks = BulkThread()
>> -                bulks.assign(jas, es, 'mbox_source')
>> +                bulks.assign(self.id, jas, es, 'mbox_source')
>>                  bulks.insert()
>>              jas = []
>> -        print("Done, %u elements left to slurp" % len(lists))
>> +        self.printid("Done, %u elements left to slurp" % len(lists))
>>
>>  parser = argparse.ArgumentParser(description='Command line options.')
>>  parser.add_argument('--source', dest='source', type=str, nargs=1,
>> @@ -637,7 +646,7 @@ threads = []
>>  cc = min(len(lists), int( multiprocessing.cpu_count() / 2) + 1)
>>  print("Starting up to %u threads to fetch the %u %s lists" % (cc, 
>> len(lists), project))
>>  for i in range(1,cc+1):
>> -    t = SlurpThread()
>> +    t = SlurpThread(i)
>>      threads.append(t)
>>      t.start()
>>      print("Started no. %u" % i)
>>
>>

Reply via email to