Hi all,
after latest commits, I get now the following error when importing from mbox:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "import-mbox.py", line 314, in run
    bulk.assign(self.id, ja, es, 'mbox')
AttributeError: 'SlurpThread' object has no attribute 'id'

Regards.

On 21/11/2016 17:19, sebb wrote:
On 21 November 2016 at 11:52, Daniel Gruno <[email protected]> wrote:
On 11/21/2016 12:50 PM, sebb wrote:
On 21 November 2016 at 11:40, Francesco Chicchiriccò
<[email protected]> wrote:
Hi all,
not sure but it seems that the commit below broke my scheduled import from mbox:
It won't be that commit, most likely the fix for #251

https://github.com/apache/incubator-ponymail/commit/1a3bff403166c917738fd02acefc988b909d4eae#diff-0102373f79eaa72ffaff3ce7675b6a43

This presumably means the archiver would have fallen over with the same e-mail.
Or there is an encoding problem with writing the mail to the mbox - or
reading it - so the importer is not seeing the same input as the
archiver.
The importer usually sees things as ASCII, whereas the archiver _can_
get fed input as unicode by postfix (I don't know why, but there it is).
This may explain why. I think as_bytes is a safer way to archive, as
it's binary.
That all depends how the binary is generated.
As far as I can tell, the parsed message is not stored as binary, so
it has to be encoded to create the bytes.

It would be useful to know what the message is that causes the issue.

If you can find it I can take a look later.

Exception in thread Thread-1:
Traceback (most recent call last):
   File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
     self.run()
   File "import-mbox.py", line 297, in run
     'source': message.as_string()
   File "/usr/lib/python3.5/email/message.py", line 159, in as_string
     g.flatten(self, unixfrom=unixfrom)
   File "/usr/lib/python3.5/email/generator.py", line 115, in flatten
     self._write(msg)
   File "/usr/lib/python3.5/email/generator.py", line 181, in _write
     self._dispatch(msg)
   File "/usr/lib/python3.5/email/generator.py", line 214, in _dispatch
     meth(msg)
   File "/usr/lib/python3.5/email/generator.py", line 243, in _handle_text
     msg.set_payload(payload, charset)
   File "/usr/lib/python3.5/email/message.py", line 316, in set_payload
     payload = payload.encode(charset.output_charset)
UnicodeEncodeError: 'ascii' codec can't encode character '\ufffd' in position 
3657: ordinal not in range(128)

Any hint / workaround?

On 2016-11-21 00:20 (+0100), [email protected] wrote:
Repository: incubator-ponymail
Updated Branches:
   refs/heads/master 1a3bff403 -> af1544e7b


import-mbox.py messages need the thread number

This fixes #248

Project: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ponymail/commit/af1544e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/tree/af1544e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/diff/af1544e7

Branch: refs/heads/master
Commit: af1544e7b63d81a5998a4b3a1471586d63d72a4e
Parents: 1a3bff4
Author: Sebb <[email protected]>
Authored: Sun Nov 20 23:19:55 2016 +0000
Committer: Sebb <[email protected]>
Committed: Sun Nov 20 23:19:55 2016 +0000

----------------------------------------------------------------------
  tools/import-mbox.py | 59 +++++++++++++++++++++++++++--------------------
  1 file changed, 34 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ponymail/blob/af1544e7/tools/import-mbox.py
----------------------------------------------------------------------
diff --git a/tools/import-mbox.py b/tools/import-mbox.py
index 15f09ad..12bc0d1 100755
--- a/tools/import-mbox.py
+++ b/tools/import-mbox.py
@@ -107,7 +107,9 @@ es = Elasticsearch([
  rootURL = ""

  class BulkThread(Thread):
-    def assign(self, json, xes, dtype = 'mbox', wc = 'quorum'):
+
+    def assign(self, id, json, xes, dtype = 'mbox', wc = 'quorum'):
+        self.id = id
          self.json = json
          self.xes = xes
          self.dtype = dtype
@@ -133,17 +135,24 @@ class BulkThread(Thread):
          try:
              helpers.bulk(self.xes, js_arr)
          except Exception as err:
-            print("Warning: Could not bulk insert: %s" % err)
-        #print("Inserted %u entries" % len(js_arr))
+            print("%d: Warning: Could not bulk insert: %s into %s" % 
(self.id,err,self.dtype))
+#         print("%d: Inserted %u entries into %s" % (self.id, 
len(js_arr),self.dtype))


  class SlurpThread(Thread):

+    def __init__(self, index):
+        self.id = index
+        super(SlurpThread, self).__init__()
+
+    def printid(self,message):
+        print("%d: %s" % (self.id, message))
+
      def run(self):
          global block, y, es, lists, baddies, config, resendTo, timeout, 
dedupped, dedup
          ja = []
          jas = []
-        print("Thread started")
+        self.printid("Thread started")
          mla = None
          ml = ""
          mboxfile = ""
@@ -152,16 +161,16 @@ class SlurpThread(Thread):
          archie = archiver.Archiver(parseHTML = parseHTML)

          while len(lists) > 0:
-            print("%u elements left to slurp" % len(lists))
+            self.printid("%u elements left to slurp" % len(lists))

              block.acquire()
              try:
                  mla = lists.pop(0)
                  if not mla:
-                    print("Nothing more to do here")
+                    self.printid("Nothing more to do here")
                      return
              except Exception as err:
-                print("Could not pop list: %s" % err)
+                self.printid("Could not pop list: %s" % err)
                  return
              finally:
                  block.release()
@@ -184,7 +193,7 @@ class SlurpThread(Thread):
                  tmpname = mla[0]
                  filename = mla[0]
                  if filename.find(".gz") != -1:
-                    print("Decompressing %s..." % filename)
+                    self.printid("Decompressing %s..." % filename)
                      try:
                          with open(filename, "rb") as bf:
                              bmd = bf.read()
@@ -197,16 +206,16 @@ class SlurpThread(Thread):
                              tmpname = tmpfile.name
                              filename = tmpname
                              dFile = True # Slated for deletion upon having 
been read
-                            print("%s -> %u bytes" % (tmpname, len(bmd)))
+                            self.printid("%s -> %u bytes" % (tmpname, 
len(bmd)))
                      except Exception as err:
-                        print("This wasn't a gzip file: %s" % err )
-                print("Slurping %s" % filename)
+                        self.printid("This wasn't a gzip file: %s" % err )
+                self.printid("Slurping %s" % filename)
                  messages = mailbox.mbox(tmpname)

              else:
                  ml = mla[0]
                  mboxfile = mla[1]
-                print("Slurping %s/%s" % (ml, mboxfile))
+                self.printid("Slurping %s/%s" % (ml, mboxfile))
                  m = re.match(r"(\d\d\d\d)(\d\d)", mboxfile)
                  EY = 1997
                  EM = 1
@@ -232,7 +241,7 @@ class SlurpThread(Thread):
                  if fromFilter and 'from' in message and 
message['from'].find(fromFilter) == -1:
                      continue
                  if resendTo:
-                    print("Delivering message %s via MTA" % 
message['message-id'] if 'message-id' in message else '??')
+                    self.printid("Delivering message %s via MTA" % 
message['message-id'] if 'message-id' in message else '??')
                      s = SMTP('localhost')
                      try:
                          if list_override:
@@ -245,7 +254,7 @@ class SlurpThread(Thread):
                      s.send_message(message, from_addr=None, 
to_addrs=(resendTo))
                      continue
                  if (time.time() - stime > timeout): # break out after N 
seconds, it shouldn't take this long..!
-                    print("Whoa, this is taking way too long, ignoring %s for 
now" % tmpname)
+                    self.printid("Whoa, this is taking way too long, ignoring %s 
for now" % tmpname)
                      break

                  json, contents = archie.compute_updates(list_override, 
private, message)
@@ -271,7 +280,7 @@ class SlurpThread(Thread):
                          }
                      )
                      if res and len(res['hits']['hits']) > 0:
-                        print("Dedupping %s" % json['message-id'])
+                        self.printid("Dedupping %s" % json['message-id'])
                          dedupped += 1
                          continue

@@ -305,43 +314,43 @@ class SlurpThread(Thread):
                      if len(ja) >= 40:
                          if not args.dry:
                              bulk = BulkThread()
-                            bulk.assign(ja, es, 'mbox')
+                            bulk.assign(self.id, ja, es, 'mbox')
                              bulk.insert()
                          ja = []

                          if not args.dry:
                              bulks = BulkThread()
-                            bulks.assign(jas, es, 'mbox_source')
+                            bulks.assign(self.id, jas, es, 'mbox_source')
                              bulks.insert()
                          jas = []
                  else:
-                    print("Failed to parse: Return=%s Message-Id=%s" % 
(message.get('Return-Path'), message.get('Message-Id')))
+                    self.printid("Failed to parse: Return=%s Message-Id=%s" % 
(message.get('Return-Path'), message.get('Message-Id')))
                      bad += 1

              if filebased:
-                print("Parsed %u records (failed: %u) from %s" % (count, bad, 
filename))
+                self.printid("Parsed %u records (failed: %u) from %s" % 
(count, bad, filename))
                  if dFile:
                      os.unlink(tmpname)
              elif imap:
-                print("Parsed %u records (failed: %u) from imap" % (count, 
bad))
+                self.printid("Parsed %u records (failed: %u) from imap" % 
(count, bad))
              else:
-                print("Parsed %s/%s: %u records (failed: %u) from %s" % (ml, 
mboxfile, count, bad, tmpname))
+                self.printid("Parsed %s/%s: %u records (failed: %u) from %s" % 
(ml, mboxfile, count, bad, tmpname))
                  os.unlink(tmpname)

              y += count
              baddies += bad
              if not args.dry:
                  bulk = BulkThread()
-                bulk.assign(ja, es, 'mbox')
+                bulk.assign(self.id, ja, es, 'mbox')
                  bulk.insert()
              ja = []

              if not args.dry:
                  bulks = BulkThread()
-                bulks.assign(jas, es, 'mbox_source')
+                bulks.assign(self.id, jas, es, 'mbox_source')
                  bulks.insert()
              jas = []
-        print("Done, %u elements left to slurp" % len(lists))
+        self.printid("Done, %u elements left to slurp" % len(lists))

  parser = argparse.ArgumentParser(description='Command line options.')
  parser.add_argument('--source', dest='source', type=str, nargs=1,
@@ -637,7 +646,7 @@ threads = []
  cc = min(len(lists), int( multiprocessing.cpu_count() / 2) + 1)
  print("Starting up to %u threads to fetch the %u %s lists" % (cc, len(lists), 
project))
  for i in range(1,cc+1):
-    t = SlurpThread()
+    t = SlurpThread(i)
      threads.append(t)
      t.start()
      print("Started no. %u" % i)





--
Francesco Chicchiriccò

Tirasa - Open Source Excellence
http://www.tirasa.net/

Member at The Apache Software Foundation
Syncope, Cocoon, Olingo, CXF, OpenJPA, PonyMail
http://home.apache.org/~ilgrosso/

Reply via email to