On 11/21/2016 12:50 PM, sebb wrote: > On 21 November 2016 at 11:40, Francesco Chicchiriccò > <[email protected]> wrote: >> Hi all, >> not sure but it seems that the commit below broke my scheduled import from >> mbox: > > It won't be that commit, most likely the fix for #251 > > https://github.com/apache/incubator-ponymail/commit/1a3bff403166c917738fd02acefc988b909d4eae#diff-0102373f79eaa72ffaff3ce7675b6a43 > > This presumably means the archiver would have fallen over with the same > e-mail. > Or there is an encoding problem with writing the mail to the mbox - or > reading it - so the importer is not seeing the same input as the > archiver.
The importer usually sees things as ASCII, whereas the archiver _can_ get fed input as unicode by postfix (I don't know why, but there it is). This may explain why. I think as_bytes is a safer way to archive, as it's binary. > > It would be useful to know what the message is that causes the issue. > > If you can find it I can take a look later. > >> Exception in thread Thread-1: >> Traceback (most recent call last): >> File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner >> self.run() >> File "import-mbox.py", line 297, in run >> 'source': message.as_string() >> File "/usr/lib/python3.5/email/message.py", line 159, in as_string >> g.flatten(self, unixfrom=unixfrom) >> File "/usr/lib/python3.5/email/generator.py", line 115, in flatten >> self._write(msg) >> File "/usr/lib/python3.5/email/generator.py", line 181, in _write >> self._dispatch(msg) >> File "/usr/lib/python3.5/email/generator.py", line 214, in _dispatch >> meth(msg) >> File "/usr/lib/python3.5/email/generator.py", line 243, in _handle_text >> msg.set_payload(payload, charset) >> File "/usr/lib/python3.5/email/message.py", line 316, in set_payload >> payload = payload.encode(charset.output_charset) >> UnicodeEncodeError: 'ascii' codec can't encode character '\ufffd' in >> position 3657: ordinal not in range(128) >> >> Any hint / workaround? >> >> On 2016-11-21 00:20 (+0100), [email protected] wrote: >>> Repository: incubator-ponymail >>> Updated Branches: >>> refs/heads/master 1a3bff403 -> af1544e7b >>> >>> >>> import-mbox.py messages need the thread number >>> >>> This fixes #248 >>> >>> Project: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/repo >>> Commit: >>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/commit/af1544e7 >>> Tree: >>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/tree/af1544e7 >>> Diff: >>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/diff/af1544e7 >>> >>> Branch: refs/heads/master >>> Commit: af1544e7b63d81a5998a4b3a1471586d63d72a4e >>> Parents: 1a3bff4 >>> Author: Sebb <[email protected]> >>> Authored: Sun Nov 20 23:19:55 2016 +0000 >>> Committer: Sebb <[email protected]> >>> Committed: Sun Nov 20 23:19:55 2016 +0000 >>> >>> ---------------------------------------------------------------------- >>> tools/import-mbox.py | 59 +++++++++++++++++++++++++++-------------------- >>> 1 file changed, 34 insertions(+), 25 deletions(-) >>> ---------------------------------------------------------------------- >>> >>> >>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/blob/af1544e7/tools/import-mbox.py >>> ---------------------------------------------------------------------- >>> diff --git a/tools/import-mbox.py b/tools/import-mbox.py >>> index 15f09ad..12bc0d1 100755 >>> --- a/tools/import-mbox.py >>> +++ b/tools/import-mbox.py >>> @@ -107,7 +107,9 @@ es = Elasticsearch([ >>> rootURL = "" >>> >>> class BulkThread(Thread): >>> - def assign(self, json, xes, dtype = 'mbox', wc = 'quorum'): >>> + >>> + def assign(self, id, json, xes, dtype = 'mbox', wc = 'quorum'): >>> + self.id = id >>> self.json = json >>> self.xes = xes >>> self.dtype = dtype >>> @@ -133,17 +135,24 @@ class BulkThread(Thread): >>> try: >>> helpers.bulk(self.xes, js_arr) >>> except Exception as err: >>> - print("Warning: Could not bulk insert: %s" % err) >>> - #print("Inserted %u entries" % len(js_arr)) >>> + print("%d: Warning: Could not bulk insert: %s into %s" % >>> (self.id,err,self.dtype)) >>> +# print("%d: Inserted %u entries into %s" % (self.id, >>> len(js_arr),self.dtype)) >>> >>> >>> class SlurpThread(Thread): >>> >>> + def __init__(self, index): >>> + self.id = index >>> + super(SlurpThread, self).__init__() >>> + >>> + def printid(self,message): >>> + print("%d: %s" % (self.id, message)) >>> + >>> def run(self): >>> global block, y, es, lists, baddies, config, resendTo, timeout, >>> dedupped, dedup >>> ja = [] >>> jas = [] >>> - print("Thread started") >>> + self.printid("Thread started") >>> mla = None >>> ml = "" >>> mboxfile = "" >>> @@ -152,16 +161,16 @@ class SlurpThread(Thread): >>> archie = archiver.Archiver(parseHTML = parseHTML) >>> >>> while len(lists) > 0: >>> - print("%u elements left to slurp" % len(lists)) >>> + self.printid("%u elements left to slurp" % len(lists)) >>> >>> block.acquire() >>> try: >>> mla = lists.pop(0) >>> if not mla: >>> - print("Nothing more to do here") >>> + self.printid("Nothing more to do here") >>> return >>> except Exception as err: >>> - print("Could not pop list: %s" % err) >>> + self.printid("Could not pop list: %s" % err) >>> return >>> finally: >>> block.release() >>> @@ -184,7 +193,7 @@ class SlurpThread(Thread): >>> tmpname = mla[0] >>> filename = mla[0] >>> if filename.find(".gz") != -1: >>> - print("Decompressing %s..." % filename) >>> + self.printid("Decompressing %s..." % filename) >>> try: >>> with open(filename, "rb") as bf: >>> bmd = bf.read() >>> @@ -197,16 +206,16 @@ class SlurpThread(Thread): >>> tmpname = tmpfile.name >>> filename = tmpname >>> dFile = True # Slated for deletion upon having >>> been read >>> - print("%s -> %u bytes" % (tmpname, len(bmd))) >>> + self.printid("%s -> %u bytes" % (tmpname, >>> len(bmd))) >>> except Exception as err: >>> - print("This wasn't a gzip file: %s" % err ) >>> - print("Slurping %s" % filename) >>> + self.printid("This wasn't a gzip file: %s" % err ) >>> + self.printid("Slurping %s" % filename) >>> messages = mailbox.mbox(tmpname) >>> >>> else: >>> ml = mla[0] >>> mboxfile = mla[1] >>> - print("Slurping %s/%s" % (ml, mboxfile)) >>> + self.printid("Slurping %s/%s" % (ml, mboxfile)) >>> m = re.match(r"(\d\d\d\d)(\d\d)", mboxfile) >>> EY = 1997 >>> EM = 1 >>> @@ -232,7 +241,7 @@ class SlurpThread(Thread): >>> if fromFilter and 'from' in message and >>> message['from'].find(fromFilter) == -1: >>> continue >>> if resendTo: >>> - print("Delivering message %s via MTA" % >>> message['message-id'] if 'message-id' in message else '??') >>> + self.printid("Delivering message %s via MTA" % >>> message['message-id'] if 'message-id' in message else '??') >>> s = SMTP('localhost') >>> try: >>> if list_override: >>> @@ -245,7 +254,7 @@ class SlurpThread(Thread): >>> s.send_message(message, from_addr=None, >>> to_addrs=(resendTo)) >>> continue >>> if (time.time() - stime > timeout): # break out after N >>> seconds, it shouldn't take this long..! >>> - print("Whoa, this is taking way too long, ignoring %s >>> for now" % tmpname) >>> + self.printid("Whoa, this is taking way too long, >>> ignoring %s for now" % tmpname) >>> break >>> >>> json, contents = archie.compute_updates(list_override, >>> private, message) >>> @@ -271,7 +280,7 @@ class SlurpThread(Thread): >>> } >>> ) >>> if res and len(res['hits']['hits']) > 0: >>> - print("Dedupping %s" % json['message-id']) >>> + self.printid("Dedupping %s" % json['message-id']) >>> dedupped += 1 >>> continue >>> >>> @@ -305,43 +314,43 @@ class SlurpThread(Thread): >>> if len(ja) >= 40: >>> if not args.dry: >>> bulk = BulkThread() >>> - bulk.assign(ja, es, 'mbox') >>> + bulk.assign(self.id, ja, es, 'mbox') >>> bulk.insert() >>> ja = [] >>> >>> if not args.dry: >>> bulks = BulkThread() >>> - bulks.assign(jas, es, 'mbox_source') >>> + bulks.assign(self.id, jas, es, 'mbox_source') >>> bulks.insert() >>> jas = [] >>> else: >>> - print("Failed to parse: Return=%s Message-Id=%s" % >>> (message.get('Return-Path'), message.get('Message-Id'))) >>> + self.printid("Failed to parse: Return=%s >>> Message-Id=%s" % (message.get('Return-Path'), message.get('Message-Id'))) >>> bad += 1 >>> >>> if filebased: >>> - print("Parsed %u records (failed: %u) from %s" % (count, >>> bad, filename)) >>> + self.printid("Parsed %u records (failed: %u) from %s" % >>> (count, bad, filename)) >>> if dFile: >>> os.unlink(tmpname) >>> elif imap: >>> - print("Parsed %u records (failed: %u) from imap" % (count, >>> bad)) >>> + self.printid("Parsed %u records (failed: %u) from imap" % >>> (count, bad)) >>> else: >>> - print("Parsed %s/%s: %u records (failed: %u) from %s" % >>> (ml, mboxfile, count, bad, tmpname)) >>> + self.printid("Parsed %s/%s: %u records (failed: %u) from >>> %s" % (ml, mboxfile, count, bad, tmpname)) >>> os.unlink(tmpname) >>> >>> y += count >>> baddies += bad >>> if not args.dry: >>> bulk = BulkThread() >>> - bulk.assign(ja, es, 'mbox') >>> + bulk.assign(self.id, ja, es, 'mbox') >>> bulk.insert() >>> ja = [] >>> >>> if not args.dry: >>> bulks = BulkThread() >>> - bulks.assign(jas, es, 'mbox_source') >>> + bulks.assign(self.id, jas, es, 'mbox_source') >>> bulks.insert() >>> jas = [] >>> - print("Done, %u elements left to slurp" % len(lists)) >>> + self.printid("Done, %u elements left to slurp" % len(lists)) >>> >>> parser = argparse.ArgumentParser(description='Command line options.') >>> parser.add_argument('--source', dest='source', type=str, nargs=1, >>> @@ -637,7 +646,7 @@ threads = [] >>> cc = min(len(lists), int( multiprocessing.cpu_count() / 2) + 1) >>> print("Starting up to %u threads to fetch the %u %s lists" % (cc, >>> len(lists), project)) >>> for i in range(1,cc+1): >>> - t = SlurpThread() >>> + t = SlurpThread(i) >>> threads.append(t) >>> t.start() >>> print("Started no. %u" % i) >>> >>>
