On 11/21/2016 12:50 PM, sebb wrote:
> On 21 November 2016 at 11:40, Francesco Chicchiriccò
> <[email protected]> wrote:
>> Hi all,
>> not sure but it seems that the commit below broke my scheduled import from 
>> mbox:
> 
> It won't be that commit, most likely the fix for #251
> 
> https://github.com/apache/incubator-ponymail/commit/1a3bff403166c917738fd02acefc988b909d4eae#diff-0102373f79eaa72ffaff3ce7675b6a43
> 
> This presumably means the archiver would have fallen over with the same 
> e-mail.
> Or there is an encoding problem with writing the mail to the mbox - or
> reading it - so the importer is not seeing the same input as the
> archiver.

The importer usually sees things as ASCII, whereas the archiver _can_
get fed input as unicode by postfix (I don't know why, but there it is).
This may explain why. I think as_bytes is a safer way to archive, as
it's binary.

> 
> It would be useful to know what the message is that causes the issue.
> 
> If you can find it I can take a look later.
> 
>> Exception in thread Thread-1:
>> Traceback (most recent call last):
>>   File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
>>     self.run()
>>   File "import-mbox.py", line 297, in run
>>     'source': message.as_string()
>>   File "/usr/lib/python3.5/email/message.py", line 159, in as_string
>>     g.flatten(self, unixfrom=unixfrom)
>>   File "/usr/lib/python3.5/email/generator.py", line 115, in flatten
>>     self._write(msg)
>>   File "/usr/lib/python3.5/email/generator.py", line 181, in _write
>>     self._dispatch(msg)
>>   File "/usr/lib/python3.5/email/generator.py", line 214, in _dispatch
>>     meth(msg)
>>   File "/usr/lib/python3.5/email/generator.py", line 243, in _handle_text
>>     msg.set_payload(payload, charset)
>>   File "/usr/lib/python3.5/email/message.py", line 316, in set_payload
>>     payload = payload.encode(charset.output_charset)
>> UnicodeEncodeError: 'ascii' codec can't encode character '\ufffd' in 
>> position 3657: ordinal not in range(128)
>>
>> Any hint / workaround?
>>
>> On 2016-11-21 00:20 (+0100), [email protected] wrote:
>>> Repository: incubator-ponymail
>>> Updated Branches:
>>>   refs/heads/master 1a3bff403 -> af1544e7b
>>>
>>>
>>> import-mbox.py messages need the thread number
>>>
>>> This fixes #248
>>>
>>> Project: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/repo
>>> Commit: 
>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/commit/af1544e7
>>> Tree: 
>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/tree/af1544e7
>>> Diff: 
>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/diff/af1544e7
>>>
>>> Branch: refs/heads/master
>>> Commit: af1544e7b63d81a5998a4b3a1471586d63d72a4e
>>> Parents: 1a3bff4
>>> Author: Sebb <[email protected]>
>>> Authored: Sun Nov 20 23:19:55 2016 +0000
>>> Committer: Sebb <[email protected]>
>>> Committed: Sun Nov 20 23:19:55 2016 +0000
>>>
>>> ----------------------------------------------------------------------
>>>  tools/import-mbox.py | 59 +++++++++++++++++++++++++++--------------------
>>>  1 file changed, 34 insertions(+), 25 deletions(-)
>>> ----------------------------------------------------------------------
>>>
>>>
>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/blob/af1544e7/tools/import-mbox.py
>>> ----------------------------------------------------------------------
>>> diff --git a/tools/import-mbox.py b/tools/import-mbox.py
>>> index 15f09ad..12bc0d1 100755
>>> --- a/tools/import-mbox.py
>>> +++ b/tools/import-mbox.py
>>> @@ -107,7 +107,9 @@ es = Elasticsearch([
>>>  rootURL = ""
>>>
>>>  class BulkThread(Thread):
>>> -    def assign(self, json, xes, dtype = 'mbox', wc = 'quorum'):
>>> +
>>> +    def assign(self, id, json, xes, dtype = 'mbox', wc = 'quorum'):
>>> +        self.id = id
>>>          self.json = json
>>>          self.xes = xes
>>>          self.dtype = dtype
>>> @@ -133,17 +135,24 @@ class BulkThread(Thread):
>>>          try:
>>>              helpers.bulk(self.xes, js_arr)
>>>          except Exception as err:
>>> -            print("Warning: Could not bulk insert: %s" % err)
>>> -        #print("Inserted %u entries" % len(js_arr))
>>> +            print("%d: Warning: Could not bulk insert: %s into %s" % 
>>> (self.id,err,self.dtype))
>>> +#         print("%d: Inserted %u entries into %s" % (self.id, 
>>> len(js_arr),self.dtype))
>>>
>>>
>>>  class SlurpThread(Thread):
>>>
>>> +    def __init__(self, index):
>>> +        self.id = index
>>> +        super(SlurpThread, self).__init__()
>>> +
>>> +    def printid(self,message):
>>> +        print("%d: %s" % (self.id, message))
>>> +
>>>      def run(self):
>>>          global block, y, es, lists, baddies, config, resendTo, timeout, 
>>> dedupped, dedup
>>>          ja = []
>>>          jas = []
>>> -        print("Thread started")
>>> +        self.printid("Thread started")
>>>          mla = None
>>>          ml = ""
>>>          mboxfile = ""
>>> @@ -152,16 +161,16 @@ class SlurpThread(Thread):
>>>          archie = archiver.Archiver(parseHTML = parseHTML)
>>>
>>>          while len(lists) > 0:
>>> -            print("%u elements left to slurp" % len(lists))
>>> +            self.printid("%u elements left to slurp" % len(lists))
>>>
>>>              block.acquire()
>>>              try:
>>>                  mla = lists.pop(0)
>>>                  if not mla:
>>> -                    print("Nothing more to do here")
>>> +                    self.printid("Nothing more to do here")
>>>                      return
>>>              except Exception as err:
>>> -                print("Could not pop list: %s" % err)
>>> +                self.printid("Could not pop list: %s" % err)
>>>                  return
>>>              finally:
>>>                  block.release()
>>> @@ -184,7 +193,7 @@ class SlurpThread(Thread):
>>>                  tmpname = mla[0]
>>>                  filename = mla[0]
>>>                  if filename.find(".gz") != -1:
>>> -                    print("Decompressing %s..." % filename)
>>> +                    self.printid("Decompressing %s..." % filename)
>>>                      try:
>>>                          with open(filename, "rb") as bf:
>>>                              bmd = bf.read()
>>> @@ -197,16 +206,16 @@ class SlurpThread(Thread):
>>>                              tmpname = tmpfile.name
>>>                              filename = tmpname
>>>                              dFile = True # Slated for deletion upon having 
>>> been read
>>> -                            print("%s -> %u bytes" % (tmpname, len(bmd)))
>>> +                            self.printid("%s -> %u bytes" % (tmpname, 
>>> len(bmd)))
>>>                      except Exception as err:
>>> -                        print("This wasn't a gzip file: %s" % err )
>>> -                print("Slurping %s" % filename)
>>> +                        self.printid("This wasn't a gzip file: %s" % err )
>>> +                self.printid("Slurping %s" % filename)
>>>                  messages = mailbox.mbox(tmpname)
>>>
>>>              else:
>>>                  ml = mla[0]
>>>                  mboxfile = mla[1]
>>> -                print("Slurping %s/%s" % (ml, mboxfile))
>>> +                self.printid("Slurping %s/%s" % (ml, mboxfile))
>>>                  m = re.match(r"(\d\d\d\d)(\d\d)", mboxfile)
>>>                  EY = 1997
>>>                  EM = 1
>>> @@ -232,7 +241,7 @@ class SlurpThread(Thread):
>>>                  if fromFilter and 'from' in message and 
>>> message['from'].find(fromFilter) == -1:
>>>                      continue
>>>                  if resendTo:
>>> -                    print("Delivering message %s via MTA" % 
>>> message['message-id'] if 'message-id' in message else '??')
>>> +                    self.printid("Delivering message %s via MTA" % 
>>> message['message-id'] if 'message-id' in message else '??')
>>>                      s = SMTP('localhost')
>>>                      try:
>>>                          if list_override:
>>> @@ -245,7 +254,7 @@ class SlurpThread(Thread):
>>>                      s.send_message(message, from_addr=None, 
>>> to_addrs=(resendTo))
>>>                      continue
>>>                  if (time.time() - stime > timeout): # break out after N 
>>> seconds, it shouldn't take this long..!
>>> -                    print("Whoa, this is taking way too long, ignoring %s 
>>> for now" % tmpname)
>>> +                    self.printid("Whoa, this is taking way too long, 
>>> ignoring %s for now" % tmpname)
>>>                      break
>>>
>>>                  json, contents = archie.compute_updates(list_override, 
>>> private, message)
>>> @@ -271,7 +280,7 @@ class SlurpThread(Thread):
>>>                          }
>>>                      )
>>>                      if res and len(res['hits']['hits']) > 0:
>>> -                        print("Dedupping %s" % json['message-id'])
>>> +                        self.printid("Dedupping %s" % json['message-id'])
>>>                          dedupped += 1
>>>                          continue
>>>
>>> @@ -305,43 +314,43 @@ class SlurpThread(Thread):
>>>                      if len(ja) >= 40:
>>>                          if not args.dry:
>>>                              bulk = BulkThread()
>>> -                            bulk.assign(ja, es, 'mbox')
>>> +                            bulk.assign(self.id, ja, es, 'mbox')
>>>                              bulk.insert()
>>>                          ja = []
>>>
>>>                          if not args.dry:
>>>                              bulks = BulkThread()
>>> -                            bulks.assign(jas, es, 'mbox_source')
>>> +                            bulks.assign(self.id, jas, es, 'mbox_source')
>>>                              bulks.insert()
>>>                          jas = []
>>>                  else:
>>> -                    print("Failed to parse: Return=%s Message-Id=%s" % 
>>> (message.get('Return-Path'), message.get('Message-Id')))
>>> +                    self.printid("Failed to parse: Return=%s 
>>> Message-Id=%s" % (message.get('Return-Path'), message.get('Message-Id')))
>>>                      bad += 1
>>>
>>>              if filebased:
>>> -                print("Parsed %u records (failed: %u) from %s" % (count, 
>>> bad, filename))
>>> +                self.printid("Parsed %u records (failed: %u) from %s" % 
>>> (count, bad, filename))
>>>                  if dFile:
>>>                      os.unlink(tmpname)
>>>              elif imap:
>>> -                print("Parsed %u records (failed: %u) from imap" % (count, 
>>> bad))
>>> +                self.printid("Parsed %u records (failed: %u) from imap" % 
>>> (count, bad))
>>>              else:
>>> -                print("Parsed %s/%s: %u records (failed: %u) from %s" % 
>>> (ml, mboxfile, count, bad, tmpname))
>>> +                self.printid("Parsed %s/%s: %u records (failed: %u) from 
>>> %s" % (ml, mboxfile, count, bad, tmpname))
>>>                  os.unlink(tmpname)
>>>
>>>              y += count
>>>              baddies += bad
>>>              if not args.dry:
>>>                  bulk = BulkThread()
>>> -                bulk.assign(ja, es, 'mbox')
>>> +                bulk.assign(self.id, ja, es, 'mbox')
>>>                  bulk.insert()
>>>              ja = []
>>>
>>>              if not args.dry:
>>>                  bulks = BulkThread()
>>> -                bulks.assign(jas, es, 'mbox_source')
>>> +                bulks.assign(self.id, jas, es, 'mbox_source')
>>>                  bulks.insert()
>>>              jas = []
>>> -        print("Done, %u elements left to slurp" % len(lists))
>>> +        self.printid("Done, %u elements left to slurp" % len(lists))
>>>
>>>  parser = argparse.ArgumentParser(description='Command line options.')
>>>  parser.add_argument('--source', dest='source', type=str, nargs=1,
>>> @@ -637,7 +646,7 @@ threads = []
>>>  cc = min(len(lists), int( multiprocessing.cpu_count() / 2) + 1)
>>>  print("Starting up to %u threads to fetch the %u %s lists" % (cc, 
>>> len(lists), project))
>>>  for i in range(1,cc+1):
>>> -    t = SlurpThread()
>>> +    t = SlurpThread(i)
>>>      threads.append(t)
>>>      t.start()
>>>      print("Started no. %u" % i)
>>>
>>>

Reply via email to