On 21 November 2016 at 11:52, Daniel Gruno <[email protected]> wrote:
> On 11/21/2016 12:50 PM, sebb wrote:
>> On 21 November 2016 at 11:40, Francesco Chicchiriccò
>> <[email protected]> wrote:
>>> Hi all,
>>> not sure but it seems that the commit below broke my scheduled import from 
>>> mbox:
>>
>> It won't be that commit, most likely the fix for #251
>>
>> https://github.com/apache/incubator-ponymail/commit/1a3bff403166c917738fd02acefc988b909d4eae#diff-0102373f79eaa72ffaff3ce7675b6a43
>>
>> This presumably means the archiver would have fallen over with the same 
>> e-mail.
>> Or there is an encoding problem with writing the mail to the mbox - or
>> reading it - so the importer is not seeing the same input as the
>> archiver.
>
> The importer usually sees things as ASCII, whereas the archiver _can_
> get fed input as unicode by postfix (I don't know why, but there it is).
> This may explain why. I think as_bytes is a safer way to archive, as
> it's binary.

That all depends how the binary is generated.
As far as I can tell, the parsed message is not stored as binary, so
it has to be encoded to create the bytes.

>>
>> It would be useful to know what the message is that causes the issue.
>>
>> If you can find it I can take a look later.
>>
>>> Exception in thread Thread-1:
>>> Traceback (most recent call last):
>>>   File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
>>>     self.run()
>>>   File "import-mbox.py", line 297, in run
>>>     'source': message.as_string()
>>>   File "/usr/lib/python3.5/email/message.py", line 159, in as_string
>>>     g.flatten(self, unixfrom=unixfrom)
>>>   File "/usr/lib/python3.5/email/generator.py", line 115, in flatten
>>>     self._write(msg)
>>>   File "/usr/lib/python3.5/email/generator.py", line 181, in _write
>>>     self._dispatch(msg)
>>>   File "/usr/lib/python3.5/email/generator.py", line 214, in _dispatch
>>>     meth(msg)
>>>   File "/usr/lib/python3.5/email/generator.py", line 243, in _handle_text
>>>     msg.set_payload(payload, charset)
>>>   File "/usr/lib/python3.5/email/message.py", line 316, in set_payload
>>>     payload = payload.encode(charset.output_charset)
>>> UnicodeEncodeError: 'ascii' codec can't encode character '\ufffd' in 
>>> position 3657: ordinal not in range(128)
>>>
>>> Any hint / workaround?
>>>
>>> On 2016-11-21 00:20 (+0100), [email protected] wrote:
>>>> Repository: incubator-ponymail
>>>> Updated Branches:
>>>>   refs/heads/master 1a3bff403 -> af1544e7b
>>>>
>>>>
>>>> import-mbox.py messages need the thread number
>>>>
>>>> This fixes #248
>>>>
>>>> Project: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/repo
>>>> Commit: 
>>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/commit/af1544e7
>>>> Tree: 
>>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/tree/af1544e7
>>>> Diff: 
>>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/diff/af1544e7
>>>>
>>>> Branch: refs/heads/master
>>>> Commit: af1544e7b63d81a5998a4b3a1471586d63d72a4e
>>>> Parents: 1a3bff4
>>>> Author: Sebb <[email protected]>
>>>> Authored: Sun Nov 20 23:19:55 2016 +0000
>>>> Committer: Sebb <[email protected]>
>>>> Committed: Sun Nov 20 23:19:55 2016 +0000
>>>>
>>>> ----------------------------------------------------------------------
>>>>  tools/import-mbox.py | 59 +++++++++++++++++++++++++++--------------------
>>>>  1 file changed, 34 insertions(+), 25 deletions(-)
>>>> ----------------------------------------------------------------------
>>>>
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/blob/af1544e7/tools/import-mbox.py
>>>> ----------------------------------------------------------------------
>>>> diff --git a/tools/import-mbox.py b/tools/import-mbox.py
>>>> index 15f09ad..12bc0d1 100755
>>>> --- a/tools/import-mbox.py
>>>> +++ b/tools/import-mbox.py
>>>> @@ -107,7 +107,9 @@ es = Elasticsearch([
>>>>  rootURL = ""
>>>>
>>>>  class BulkThread(Thread):
>>>> -    def assign(self, json, xes, dtype = 'mbox', wc = 'quorum'):
>>>> +
>>>> +    def assign(self, id, json, xes, dtype = 'mbox', wc = 'quorum'):
>>>> +        self.id = id
>>>>          self.json = json
>>>>          self.xes = xes
>>>>          self.dtype = dtype
>>>> @@ -133,17 +135,24 @@ class BulkThread(Thread):
>>>>          try:
>>>>              helpers.bulk(self.xes, js_arr)
>>>>          except Exception as err:
>>>> -            print("Warning: Could not bulk insert: %s" % err)
>>>> -        #print("Inserted %u entries" % len(js_arr))
>>>> +            print("%d: Warning: Could not bulk insert: %s into %s" % 
>>>> (self.id,err,self.dtype))
>>>> +#         print("%d: Inserted %u entries into %s" % (self.id, 
>>>> len(js_arr),self.dtype))
>>>>
>>>>
>>>>  class SlurpThread(Thread):
>>>>
>>>> +    def __init__(self, index):
>>>> +        self.id = index
>>>> +        super(SlurpThread, self).__init__()
>>>> +
>>>> +    def printid(self,message):
>>>> +        print("%d: %s" % (self.id, message))
>>>> +
>>>>      def run(self):
>>>>          global block, y, es, lists, baddies, config, resendTo, timeout, 
>>>> dedupped, dedup
>>>>          ja = []
>>>>          jas = []
>>>> -        print("Thread started")
>>>> +        self.printid("Thread started")
>>>>          mla = None
>>>>          ml = ""
>>>>          mboxfile = ""
>>>> @@ -152,16 +161,16 @@ class SlurpThread(Thread):
>>>>          archie = archiver.Archiver(parseHTML = parseHTML)
>>>>
>>>>          while len(lists) > 0:
>>>> -            print("%u elements left to slurp" % len(lists))
>>>> +            self.printid("%u elements left to slurp" % len(lists))
>>>>
>>>>              block.acquire()
>>>>              try:
>>>>                  mla = lists.pop(0)
>>>>                  if not mla:
>>>> -                    print("Nothing more to do here")
>>>> +                    self.printid("Nothing more to do here")
>>>>                      return
>>>>              except Exception as err:
>>>> -                print("Could not pop list: %s" % err)
>>>> +                self.printid("Could not pop list: %s" % err)
>>>>                  return
>>>>              finally:
>>>>                  block.release()
>>>> @@ -184,7 +193,7 @@ class SlurpThread(Thread):
>>>>                  tmpname = mla[0]
>>>>                  filename = mla[0]
>>>>                  if filename.find(".gz") != -1:
>>>> -                    print("Decompressing %s..." % filename)
>>>> +                    self.printid("Decompressing %s..." % filename)
>>>>                      try:
>>>>                          with open(filename, "rb") as bf:
>>>>                              bmd = bf.read()
>>>> @@ -197,16 +206,16 @@ class SlurpThread(Thread):
>>>>                              tmpname = tmpfile.name
>>>>                              filename = tmpname
>>>>                              dFile = True # Slated for deletion upon 
>>>> having been read
>>>> -                            print("%s -> %u bytes" % (tmpname, len(bmd)))
>>>> +                            self.printid("%s -> %u bytes" % (tmpname, 
>>>> len(bmd)))
>>>>                      except Exception as err:
>>>> -                        print("This wasn't a gzip file: %s" % err )
>>>> -                print("Slurping %s" % filename)
>>>> +                        self.printid("This wasn't a gzip file: %s" % err )
>>>> +                self.printid("Slurping %s" % filename)
>>>>                  messages = mailbox.mbox(tmpname)
>>>>
>>>>              else:
>>>>                  ml = mla[0]
>>>>                  mboxfile = mla[1]
>>>> -                print("Slurping %s/%s" % (ml, mboxfile))
>>>> +                self.printid("Slurping %s/%s" % (ml, mboxfile))
>>>>                  m = re.match(r"(\d\d\d\d)(\d\d)", mboxfile)
>>>>                  EY = 1997
>>>>                  EM = 1
>>>> @@ -232,7 +241,7 @@ class SlurpThread(Thread):
>>>>                  if fromFilter and 'from' in message and 
>>>> message['from'].find(fromFilter) == -1:
>>>>                      continue
>>>>                  if resendTo:
>>>> -                    print("Delivering message %s via MTA" % 
>>>> message['message-id'] if 'message-id' in message else '??')
>>>> +                    self.printid("Delivering message %s via MTA" % 
>>>> message['message-id'] if 'message-id' in message else '??')
>>>>                      s = SMTP('localhost')
>>>>                      try:
>>>>                          if list_override:
>>>> @@ -245,7 +254,7 @@ class SlurpThread(Thread):
>>>>                      s.send_message(message, from_addr=None, 
>>>> to_addrs=(resendTo))
>>>>                      continue
>>>>                  if (time.time() - stime > timeout): # break out after N 
>>>> seconds, it shouldn't take this long..!
>>>> -                    print("Whoa, this is taking way too long, ignoring %s 
>>>> for now" % tmpname)
>>>> +                    self.printid("Whoa, this is taking way too long, 
>>>> ignoring %s for now" % tmpname)
>>>>                      break
>>>>
>>>>                  json, contents = archie.compute_updates(list_override, 
>>>> private, message)
>>>> @@ -271,7 +280,7 @@ class SlurpThread(Thread):
>>>>                          }
>>>>                      )
>>>>                      if res and len(res['hits']['hits']) > 0:
>>>> -                        print("Dedupping %s" % json['message-id'])
>>>> +                        self.printid("Dedupping %s" % json['message-id'])
>>>>                          dedupped += 1
>>>>                          continue
>>>>
>>>> @@ -305,43 +314,43 @@ class SlurpThread(Thread):
>>>>                      if len(ja) >= 40:
>>>>                          if not args.dry:
>>>>                              bulk = BulkThread()
>>>> -                            bulk.assign(ja, es, 'mbox')
>>>> +                            bulk.assign(self.id, ja, es, 'mbox')
>>>>                              bulk.insert()
>>>>                          ja = []
>>>>
>>>>                          if not args.dry:
>>>>                              bulks = BulkThread()
>>>> -                            bulks.assign(jas, es, 'mbox_source')
>>>> +                            bulks.assign(self.id, jas, es, 'mbox_source')
>>>>                              bulks.insert()
>>>>                          jas = []
>>>>                  else:
>>>> -                    print("Failed to parse: Return=%s Message-Id=%s" % 
>>>> (message.get('Return-Path'), message.get('Message-Id')))
>>>> +                    self.printid("Failed to parse: Return=%s 
>>>> Message-Id=%s" % (message.get('Return-Path'), message.get('Message-Id')))
>>>>                      bad += 1
>>>>
>>>>              if filebased:
>>>> -                print("Parsed %u records (failed: %u) from %s" % (count, 
>>>> bad, filename))
>>>> +                self.printid("Parsed %u records (failed: %u) from %s" % 
>>>> (count, bad, filename))
>>>>                  if dFile:
>>>>                      os.unlink(tmpname)
>>>>              elif imap:
>>>> -                print("Parsed %u records (failed: %u) from imap" % 
>>>> (count, bad))
>>>> +                self.printid("Parsed %u records (failed: %u) from imap" % 
>>>> (count, bad))
>>>>              else:
>>>> -                print("Parsed %s/%s: %u records (failed: %u) from %s" % 
>>>> (ml, mboxfile, count, bad, tmpname))
>>>> +                self.printid("Parsed %s/%s: %u records (failed: %u) from 
>>>> %s" % (ml, mboxfile, count, bad, tmpname))
>>>>                  os.unlink(tmpname)
>>>>
>>>>              y += count
>>>>              baddies += bad
>>>>              if not args.dry:
>>>>                  bulk = BulkThread()
>>>> -                bulk.assign(ja, es, 'mbox')
>>>> +                bulk.assign(self.id, ja, es, 'mbox')
>>>>                  bulk.insert()
>>>>              ja = []
>>>>
>>>>              if not args.dry:
>>>>                  bulks = BulkThread()
>>>> -                bulks.assign(jas, es, 'mbox_source')
>>>> +                bulks.assign(self.id, jas, es, 'mbox_source')
>>>>                  bulks.insert()
>>>>              jas = []
>>>> -        print("Done, %u elements left to slurp" % len(lists))
>>>> +        self.printid("Done, %u elements left to slurp" % len(lists))
>>>>
>>>>  parser = argparse.ArgumentParser(description='Command line options.')
>>>>  parser.add_argument('--source', dest='source', type=str, nargs=1,
>>>> @@ -637,7 +646,7 @@ threads = []
>>>>  cc = min(len(lists), int( multiprocessing.cpu_count() / 2) + 1)
>>>>  print("Starting up to %u threads to fetch the %u %s lists" % (cc, 
>>>> len(lists), project))
>>>>  for i in range(1,cc+1):
>>>> -    t = SlurpThread()
>>>> +    t = SlurpThread(i)
>>>>      threads.append(t)
>>>>      t.start()
>>>>      print("Started no. %u" % i)
>>>>
>>>>
>

Reply via email to