On 21 November 2016 at 11:52, Daniel Gruno <[email protected]> wrote: > On 11/21/2016 12:50 PM, sebb wrote: >> On 21 November 2016 at 11:40, Francesco Chicchiriccò >> <[email protected]> wrote: >>> Hi all, >>> not sure but it seems that the commit below broke my scheduled import from >>> mbox: >> >> It won't be that commit, most likely the fix for #251 >> >> https://github.com/apache/incubator-ponymail/commit/1a3bff403166c917738fd02acefc988b909d4eae#diff-0102373f79eaa72ffaff3ce7675b6a43 >> >> This presumably means the archiver would have fallen over with the same >> e-mail. >> Or there is an encoding problem with writing the mail to the mbox - or >> reading it - so the importer is not seeing the same input as the >> archiver. > > The importer usually sees things as ASCII, whereas the archiver _can_ > get fed input as unicode by postfix (I don't know why, but there it is). > This may explain why. I think as_bytes is a safer way to archive, as > it's binary.
That all depends how the binary is generated. As far as I can tell, the parsed message is not stored as binary, so it has to be encoded to create the bytes. >> >> It would be useful to know what the message is that causes the issue. >> >> If you can find it I can take a look later. >> >>> Exception in thread Thread-1: >>> Traceback (most recent call last): >>> File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner >>> self.run() >>> File "import-mbox.py", line 297, in run >>> 'source': message.as_string() >>> File "/usr/lib/python3.5/email/message.py", line 159, in as_string >>> g.flatten(self, unixfrom=unixfrom) >>> File "/usr/lib/python3.5/email/generator.py", line 115, in flatten >>> self._write(msg) >>> File "/usr/lib/python3.5/email/generator.py", line 181, in _write >>> self._dispatch(msg) >>> File "/usr/lib/python3.5/email/generator.py", line 214, in _dispatch >>> meth(msg) >>> File "/usr/lib/python3.5/email/generator.py", line 243, in _handle_text >>> msg.set_payload(payload, charset) >>> File "/usr/lib/python3.5/email/message.py", line 316, in set_payload >>> payload = payload.encode(charset.output_charset) >>> UnicodeEncodeError: 'ascii' codec can't encode character '\ufffd' in >>> position 3657: ordinal not in range(128) >>> >>> Any hint / workaround? >>> >>> On 2016-11-21 00:20 (+0100), [email protected] wrote: >>>> Repository: incubator-ponymail >>>> Updated Branches: >>>> refs/heads/master 1a3bff403 -> af1544e7b >>>> >>>> >>>> import-mbox.py messages need the thread number >>>> >>>> This fixes #248 >>>> >>>> Project: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/repo >>>> Commit: >>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/commit/af1544e7 >>>> Tree: >>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/tree/af1544e7 >>>> Diff: >>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/diff/af1544e7 >>>> >>>> Branch: refs/heads/master >>>> Commit: af1544e7b63d81a5998a4b3a1471586d63d72a4e >>>> Parents: 1a3bff4 >>>> Author: Sebb <[email protected]> >>>> Authored: Sun Nov 20 23:19:55 2016 +0000 >>>> Committer: Sebb <[email protected]> >>>> Committed: Sun Nov 20 23:19:55 2016 +0000 >>>> >>>> ---------------------------------------------------------------------- >>>> tools/import-mbox.py | 59 +++++++++++++++++++++++++++-------------------- >>>> 1 file changed, 34 insertions(+), 25 deletions(-) >>>> ---------------------------------------------------------------------- >>>> >>>> >>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/blob/af1544e7/tools/import-mbox.py >>>> ---------------------------------------------------------------------- >>>> diff --git a/tools/import-mbox.py b/tools/import-mbox.py >>>> index 15f09ad..12bc0d1 100755 >>>> --- a/tools/import-mbox.py >>>> +++ b/tools/import-mbox.py >>>> @@ -107,7 +107,9 @@ es = Elasticsearch([ >>>> rootURL = "" >>>> >>>> class BulkThread(Thread): >>>> - def assign(self, json, xes, dtype = 'mbox', wc = 'quorum'): >>>> + >>>> + def assign(self, id, json, xes, dtype = 'mbox', wc = 'quorum'): >>>> + self.id = id >>>> self.json = json >>>> self.xes = xes >>>> self.dtype = dtype >>>> @@ -133,17 +135,24 @@ class BulkThread(Thread): >>>> try: >>>> helpers.bulk(self.xes, js_arr) >>>> except Exception as err: >>>> - print("Warning: Could not bulk insert: %s" % err) >>>> - #print("Inserted %u entries" % len(js_arr)) >>>> + print("%d: Warning: Could not bulk insert: %s into %s" % >>>> (self.id,err,self.dtype)) >>>> +# print("%d: Inserted %u entries into %s" % (self.id, >>>> len(js_arr),self.dtype)) >>>> >>>> >>>> class SlurpThread(Thread): >>>> >>>> + def __init__(self, index): >>>> + self.id = index >>>> + super(SlurpThread, self).__init__() >>>> + >>>> + def printid(self,message): >>>> + print("%d: %s" % (self.id, message)) >>>> + >>>> def run(self): >>>> global block, y, es, lists, baddies, config, resendTo, timeout, >>>> dedupped, dedup >>>> ja = [] >>>> jas = [] >>>> - print("Thread started") >>>> + self.printid("Thread started") >>>> mla = None >>>> ml = "" >>>> mboxfile = "" >>>> @@ -152,16 +161,16 @@ class SlurpThread(Thread): >>>> archie = archiver.Archiver(parseHTML = parseHTML) >>>> >>>> while len(lists) > 0: >>>> - print("%u elements left to slurp" % len(lists)) >>>> + self.printid("%u elements left to slurp" % len(lists)) >>>> >>>> block.acquire() >>>> try: >>>> mla = lists.pop(0) >>>> if not mla: >>>> - print("Nothing more to do here") >>>> + self.printid("Nothing more to do here") >>>> return >>>> except Exception as err: >>>> - print("Could not pop list: %s" % err) >>>> + self.printid("Could not pop list: %s" % err) >>>> return >>>> finally: >>>> block.release() >>>> @@ -184,7 +193,7 @@ class SlurpThread(Thread): >>>> tmpname = mla[0] >>>> filename = mla[0] >>>> if filename.find(".gz") != -1: >>>> - print("Decompressing %s..." % filename) >>>> + self.printid("Decompressing %s..." % filename) >>>> try: >>>> with open(filename, "rb") as bf: >>>> bmd = bf.read() >>>> @@ -197,16 +206,16 @@ class SlurpThread(Thread): >>>> tmpname = tmpfile.name >>>> filename = tmpname >>>> dFile = True # Slated for deletion upon >>>> having been read >>>> - print("%s -> %u bytes" % (tmpname, len(bmd))) >>>> + self.printid("%s -> %u bytes" % (tmpname, >>>> len(bmd))) >>>> except Exception as err: >>>> - print("This wasn't a gzip file: %s" % err ) >>>> - print("Slurping %s" % filename) >>>> + self.printid("This wasn't a gzip file: %s" % err ) >>>> + self.printid("Slurping %s" % filename) >>>> messages = mailbox.mbox(tmpname) >>>> >>>> else: >>>> ml = mla[0] >>>> mboxfile = mla[1] >>>> - print("Slurping %s/%s" % (ml, mboxfile)) >>>> + self.printid("Slurping %s/%s" % (ml, mboxfile)) >>>> m = re.match(r"(\d\d\d\d)(\d\d)", mboxfile) >>>> EY = 1997 >>>> EM = 1 >>>> @@ -232,7 +241,7 @@ class SlurpThread(Thread): >>>> if fromFilter and 'from' in message and >>>> message['from'].find(fromFilter) == -1: >>>> continue >>>> if resendTo: >>>> - print("Delivering message %s via MTA" % >>>> message['message-id'] if 'message-id' in message else '??') >>>> + self.printid("Delivering message %s via MTA" % >>>> message['message-id'] if 'message-id' in message else '??') >>>> s = SMTP('localhost') >>>> try: >>>> if list_override: >>>> @@ -245,7 +254,7 @@ class SlurpThread(Thread): >>>> s.send_message(message, from_addr=None, >>>> to_addrs=(resendTo)) >>>> continue >>>> if (time.time() - stime > timeout): # break out after N >>>> seconds, it shouldn't take this long..! >>>> - print("Whoa, this is taking way too long, ignoring %s >>>> for now" % tmpname) >>>> + self.printid("Whoa, this is taking way too long, >>>> ignoring %s for now" % tmpname) >>>> break >>>> >>>> json, contents = archie.compute_updates(list_override, >>>> private, message) >>>> @@ -271,7 +280,7 @@ class SlurpThread(Thread): >>>> } >>>> ) >>>> if res and len(res['hits']['hits']) > 0: >>>> - print("Dedupping %s" % json['message-id']) >>>> + self.printid("Dedupping %s" % json['message-id']) >>>> dedupped += 1 >>>> continue >>>> >>>> @@ -305,43 +314,43 @@ class SlurpThread(Thread): >>>> if len(ja) >= 40: >>>> if not args.dry: >>>> bulk = BulkThread() >>>> - bulk.assign(ja, es, 'mbox') >>>> + bulk.assign(self.id, ja, es, 'mbox') >>>> bulk.insert() >>>> ja = [] >>>> >>>> if not args.dry: >>>> bulks = BulkThread() >>>> - bulks.assign(jas, es, 'mbox_source') >>>> + bulks.assign(self.id, jas, es, 'mbox_source') >>>> bulks.insert() >>>> jas = [] >>>> else: >>>> - print("Failed to parse: Return=%s Message-Id=%s" % >>>> (message.get('Return-Path'), message.get('Message-Id'))) >>>> + self.printid("Failed to parse: Return=%s >>>> Message-Id=%s" % (message.get('Return-Path'), message.get('Message-Id'))) >>>> bad += 1 >>>> >>>> if filebased: >>>> - print("Parsed %u records (failed: %u) from %s" % (count, >>>> bad, filename)) >>>> + self.printid("Parsed %u records (failed: %u) from %s" % >>>> (count, bad, filename)) >>>> if dFile: >>>> os.unlink(tmpname) >>>> elif imap: >>>> - print("Parsed %u records (failed: %u) from imap" % >>>> (count, bad)) >>>> + self.printid("Parsed %u records (failed: %u) from imap" % >>>> (count, bad)) >>>> else: >>>> - print("Parsed %s/%s: %u records (failed: %u) from %s" % >>>> (ml, mboxfile, count, bad, tmpname)) >>>> + self.printid("Parsed %s/%s: %u records (failed: %u) from >>>> %s" % (ml, mboxfile, count, bad, tmpname)) >>>> os.unlink(tmpname) >>>> >>>> y += count >>>> baddies += bad >>>> if not args.dry: >>>> bulk = BulkThread() >>>> - bulk.assign(ja, es, 'mbox') >>>> + bulk.assign(self.id, ja, es, 'mbox') >>>> bulk.insert() >>>> ja = [] >>>> >>>> if not args.dry: >>>> bulks = BulkThread() >>>> - bulks.assign(jas, es, 'mbox_source') >>>> + bulks.assign(self.id, jas, es, 'mbox_source') >>>> bulks.insert() >>>> jas = [] >>>> - print("Done, %u elements left to slurp" % len(lists)) >>>> + self.printid("Done, %u elements left to slurp" % len(lists)) >>>> >>>> parser = argparse.ArgumentParser(description='Command line options.') >>>> parser.add_argument('--source', dest='source', type=str, nargs=1, >>>> @@ -637,7 +646,7 @@ threads = [] >>>> cc = min(len(lists), int( multiprocessing.cpu_count() / 2) + 1) >>>> print("Starting up to %u threads to fetch the %u %s lists" % (cc, >>>> len(lists), project)) >>>> for i in range(1,cc+1): >>>> - t = SlurpThread() >>>> + t = SlurpThread(i) >>>> threads.append(t) >>>> t.start() >>>> print("Started no. %u" % i) >>>> >>>> >
