Sorry about that, I decided to change the thread id to its name and did not change all the references. Should be OK now.
Going back to the original encoding issue: I have tried and failed to reproduce it. Can you find out which mbox caused the problem so I can take a look? On 22 November 2016 at 07:23, Francesco Chicchiriccò <[email protected]> wrote: > Hi all, > after latest commits, I get now the following error when importing from > mbox: > > Exception in thread Thread-1: > Traceback (most recent call last): > File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner > self.run() > File "import-mbox.py", line 314, in run > bulk.assign(self.id, ja, es, 'mbox') > AttributeError: 'SlurpThread' object has no attribute 'id' > > Regards. > > > On 21/11/2016 17:19, sebb wrote: >> >> On 21 November 2016 at 11:52, Daniel Gruno <[email protected]> wrote: >>> >>> On 11/21/2016 12:50 PM, sebb wrote: >>>> >>>> On 21 November 2016 at 11:40, Francesco Chicchiriccò >>>> <[email protected]> wrote: >>>>> >>>>> Hi all, >>>>> not sure but it seems that the commit below broke my scheduled import >>>>> from mbox: >>>> >>>> It won't be that commit, most likely the fix for #251 >>>> >>>> >>>> https://github.com/apache/incubator-ponymail/commit/1a3bff403166c917738fd02acefc988b909d4eae#diff-0102373f79eaa72ffaff3ce7675b6a43 >>>> >>>> This presumably means the archiver would have fallen over with the same >>>> e-mail. >>>> Or there is an encoding problem with writing the mail to the mbox - or >>>> reading it - so the importer is not seeing the same input as the >>>> archiver. >>> >>> The importer usually sees things as ASCII, whereas the archiver _can_ >>> get fed input as unicode by postfix (I don't know why, but there it is). >>> This may explain why. I think as_bytes is a safer way to archive, as >>> it's binary. >> >> That all depends how the binary is generated. >> As far as I can tell, the parsed message is not stored as binary, so >> it has to be encoded to create the bytes. >> >>>> It would be useful to know what the message is that causes the issue. >>>> >>>> If you can find it I can take a look later. >>>> >>>>> Exception in thread Thread-1: >>>>> Traceback (most recent call last): >>>>> File "/usr/lib/python3.5/threading.py", line 914, in >>>>> _bootstrap_inner >>>>> self.run() >>>>> File "import-mbox.py", line 297, in run >>>>> 'source': message.as_string() >>>>> File "/usr/lib/python3.5/email/message.py", line 159, in as_string >>>>> g.flatten(self, unixfrom=unixfrom) >>>>> File "/usr/lib/python3.5/email/generator.py", line 115, in flatten >>>>> self._write(msg) >>>>> File "/usr/lib/python3.5/email/generator.py", line 181, in _write >>>>> self._dispatch(msg) >>>>> File "/usr/lib/python3.5/email/generator.py", line 214, in _dispatch >>>>> meth(msg) >>>>> File "/usr/lib/python3.5/email/generator.py", line 243, in >>>>> _handle_text >>>>> msg.set_payload(payload, charset) >>>>> File "/usr/lib/python3.5/email/message.py", line 316, in set_payload >>>>> payload = payload.encode(charset.output_charset) >>>>> UnicodeEncodeError: 'ascii' codec can't encode character '\ufffd' in >>>>> position 3657: ordinal not in range(128) >>>>> >>>>> Any hint / workaround? >>>>> >>>>> On 2016-11-21 00:20 (+0100), [email protected] wrote: >>>>>> >>>>>> Repository: incubator-ponymail >>>>>> Updated Branches: >>>>>> refs/heads/master 1a3bff403 -> af1544e7b >>>>>> >>>>>> >>>>>> import-mbox.py messages need the thread number >>>>>> >>>>>> This fixes #248 >>>>>> >>>>>> Project: >>>>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/repo >>>>>> Commit: >>>>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/commit/af1544e7 >>>>>> Tree: >>>>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/tree/af1544e7 >>>>>> Diff: >>>>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/diff/af1544e7 >>>>>> >>>>>> Branch: refs/heads/master >>>>>> Commit: af1544e7b63d81a5998a4b3a1471586d63d72a4e >>>>>> Parents: 1a3bff4 >>>>>> Author: Sebb <[email protected]> >>>>>> Authored: Sun Nov 20 23:19:55 2016 +0000 >>>>>> Committer: Sebb <[email protected]> >>>>>> Committed: Sun Nov 20 23:19:55 2016 +0000 >>>>>> >>>>>> ---------------------------------------------------------------------- >>>>>> tools/import-mbox.py | 59 >>>>>> +++++++++++++++++++++++++++-------------------- >>>>>> 1 file changed, 34 insertions(+), 25 deletions(-) >>>>>> ---------------------------------------------------------------------- >>>>>> >>>>>> >>>>>> >>>>>> http://git-wip-us.apache.org/repos/asf/incubator-ponymail/blob/af1544e7/tools/import-mbox.py >>>>>> ---------------------------------------------------------------------- >>>>>> diff --git a/tools/import-mbox.py b/tools/import-mbox.py >>>>>> index 15f09ad..12bc0d1 100755 >>>>>> --- a/tools/import-mbox.py >>>>>> +++ b/tools/import-mbox.py >>>>>> @@ -107,7 +107,9 @@ es = Elasticsearch([ >>>>>> rootURL = "" >>>>>> >>>>>> class BulkThread(Thread): >>>>>> - def assign(self, json, xes, dtype = 'mbox', wc = 'quorum'): >>>>>> + >>>>>> + def assign(self, id, json, xes, dtype = 'mbox', wc = 'quorum'): >>>>>> + self.id = id >>>>>> self.json = json >>>>>> self.xes = xes >>>>>> self.dtype = dtype >>>>>> @@ -133,17 +135,24 @@ class BulkThread(Thread): >>>>>> try: >>>>>> helpers.bulk(self.xes, js_arr) >>>>>> except Exception as err: >>>>>> - print("Warning: Could not bulk insert: %s" % err) >>>>>> - #print("Inserted %u entries" % len(js_arr)) >>>>>> + print("%d: Warning: Could not bulk insert: %s into %s" % >>>>>> (self.id,err,self.dtype)) >>>>>> +# print("%d: Inserted %u entries into %s" % (self.id, >>>>>> len(js_arr),self.dtype)) >>>>>> >>>>>> >>>>>> class SlurpThread(Thread): >>>>>> >>>>>> + def __init__(self, index): >>>>>> + self.id = index >>>>>> + super(SlurpThread, self).__init__() >>>>>> + >>>>>> + def printid(self,message): >>>>>> + print("%d: %s" % (self.id, message)) >>>>>> + >>>>>> def run(self): >>>>>> global block, y, es, lists, baddies, config, resendTo, >>>>>> timeout, dedupped, dedup >>>>>> ja = [] >>>>>> jas = [] >>>>>> - print("Thread started") >>>>>> + self.printid("Thread started") >>>>>> mla = None >>>>>> ml = "" >>>>>> mboxfile = "" >>>>>> @@ -152,16 +161,16 @@ class SlurpThread(Thread): >>>>>> archie = archiver.Archiver(parseHTML = parseHTML) >>>>>> >>>>>> while len(lists) > 0: >>>>>> - print("%u elements left to slurp" % len(lists)) >>>>>> + self.printid("%u elements left to slurp" % len(lists)) >>>>>> >>>>>> block.acquire() >>>>>> try: >>>>>> mla = lists.pop(0) >>>>>> if not mla: >>>>>> - print("Nothing more to do here") >>>>>> + self.printid("Nothing more to do here") >>>>>> return >>>>>> except Exception as err: >>>>>> - print("Could not pop list: %s" % err) >>>>>> + self.printid("Could not pop list: %s" % err) >>>>>> return >>>>>> finally: >>>>>> block.release() >>>>>> @@ -184,7 +193,7 @@ class SlurpThread(Thread): >>>>>> tmpname = mla[0] >>>>>> filename = mla[0] >>>>>> if filename.find(".gz") != -1: >>>>>> - print("Decompressing %s..." % filename) >>>>>> + self.printid("Decompressing %s..." % filename) >>>>>> try: >>>>>> with open(filename, "rb") as bf: >>>>>> bmd = bf.read() >>>>>> @@ -197,16 +206,16 @@ class SlurpThread(Thread): >>>>>> tmpname = tmpfile.name >>>>>> filename = tmpname >>>>>> dFile = True # Slated for deletion upon >>>>>> having been read >>>>>> - print("%s -> %u bytes" % (tmpname, >>>>>> len(bmd))) >>>>>> + self.printid("%s -> %u bytes" % (tmpname, >>>>>> len(bmd))) >>>>>> except Exception as err: >>>>>> - print("This wasn't a gzip file: %s" % err ) >>>>>> - print("Slurping %s" % filename) >>>>>> + self.printid("This wasn't a gzip file: %s" % >>>>>> err ) >>>>>> + self.printid("Slurping %s" % filename) >>>>>> messages = mailbox.mbox(tmpname) >>>>>> >>>>>> else: >>>>>> ml = mla[0] >>>>>> mboxfile = mla[1] >>>>>> - print("Slurping %s/%s" % (ml, mboxfile)) >>>>>> + self.printid("Slurping %s/%s" % (ml, mboxfile)) >>>>>> m = re.match(r"(\d\d\d\d)(\d\d)", mboxfile) >>>>>> EY = 1997 >>>>>> EM = 1 >>>>>> @@ -232,7 +241,7 @@ class SlurpThread(Thread): >>>>>> if fromFilter and 'from' in message and >>>>>> message['from'].find(fromFilter) == -1: >>>>>> continue >>>>>> if resendTo: >>>>>> - print("Delivering message %s via MTA" % >>>>>> message['message-id'] if 'message-id' in message else '??') >>>>>> + self.printid("Delivering message %s via MTA" % >>>>>> message['message-id'] if 'message-id' in message else '??') >>>>>> s = SMTP('localhost') >>>>>> try: >>>>>> if list_override: >>>>>> @@ -245,7 +254,7 @@ class SlurpThread(Thread): >>>>>> s.send_message(message, from_addr=None, >>>>>> to_addrs=(resendTo)) >>>>>> continue >>>>>> if (time.time() - stime > timeout): # break out >>>>>> after N seconds, it shouldn't take this long..! >>>>>> - print("Whoa, this is taking way too long, >>>>>> ignoring %s for now" % tmpname) >>>>>> + self.printid("Whoa, this is taking way too long, >>>>>> ignoring %s for now" % tmpname) >>>>>> break >>>>>> >>>>>> json, contents = >>>>>> archie.compute_updates(list_override, private, message) >>>>>> @@ -271,7 +280,7 @@ class SlurpThread(Thread): >>>>>> } >>>>>> ) >>>>>> if res and len(res['hits']['hits']) > 0: >>>>>> - print("Dedupping %s" % json['message-id']) >>>>>> + self.printid("Dedupping %s" % >>>>>> json['message-id']) >>>>>> dedupped += 1 >>>>>> continue >>>>>> >>>>>> @@ -305,43 +314,43 @@ class SlurpThread(Thread): >>>>>> if len(ja) >= 40: >>>>>> if not args.dry: >>>>>> bulk = BulkThread() >>>>>> - bulk.assign(ja, es, 'mbox') >>>>>> + bulk.assign(self.id, ja, es, 'mbox') >>>>>> bulk.insert() >>>>>> ja = [] >>>>>> >>>>>> if not args.dry: >>>>>> bulks = BulkThread() >>>>>> - bulks.assign(jas, es, 'mbox_source') >>>>>> + bulks.assign(self.id, jas, es, >>>>>> 'mbox_source') >>>>>> bulks.insert() >>>>>> jas = [] >>>>>> else: >>>>>> - print("Failed to parse: Return=%s Message-Id=%s" >>>>>> % (message.get('Return-Path'), message.get('Message-Id'))) >>>>>> + self.printid("Failed to parse: Return=%s >>>>>> Message-Id=%s" % (message.get('Return-Path'), message.get('Message-Id'))) >>>>>> bad += 1 >>>>>> >>>>>> if filebased: >>>>>> - print("Parsed %u records (failed: %u) from %s" % >>>>>> (count, bad, filename)) >>>>>> + self.printid("Parsed %u records (failed: %u) from %s" >>>>>> % (count, bad, filename)) >>>>>> if dFile: >>>>>> os.unlink(tmpname) >>>>>> elif imap: >>>>>> - print("Parsed %u records (failed: %u) from imap" % >>>>>> (count, bad)) >>>>>> + self.printid("Parsed %u records (failed: %u) from >>>>>> imap" % (count, bad)) >>>>>> else: >>>>>> - print("Parsed %s/%s: %u records (failed: %u) from %s" >>>>>> % (ml, mboxfile, count, bad, tmpname)) >>>>>> + self.printid("Parsed %s/%s: %u records (failed: %u) >>>>>> from %s" % (ml, mboxfile, count, bad, tmpname)) >>>>>> os.unlink(tmpname) >>>>>> >>>>>> y += count >>>>>> baddies += bad >>>>>> if not args.dry: >>>>>> bulk = BulkThread() >>>>>> - bulk.assign(ja, es, 'mbox') >>>>>> + bulk.assign(self.id, ja, es, 'mbox') >>>>>> bulk.insert() >>>>>> ja = [] >>>>>> >>>>>> if not args.dry: >>>>>> bulks = BulkThread() >>>>>> - bulks.assign(jas, es, 'mbox_source') >>>>>> + bulks.assign(self.id, jas, es, 'mbox_source') >>>>>> bulks.insert() >>>>>> jas = [] >>>>>> - print("Done, %u elements left to slurp" % len(lists)) >>>>>> + self.printid("Done, %u elements left to slurp" % len(lists)) >>>>>> >>>>>> parser = argparse.ArgumentParser(description='Command line >>>>>> options.') >>>>>> parser.add_argument('--source', dest='source', type=str, nargs=1, >>>>>> @@ -637,7 +646,7 @@ threads = [] >>>>>> cc = min(len(lists), int( multiprocessing.cpu_count() / 2) + 1) >>>>>> print("Starting up to %u threads to fetch the %u %s lists" % (cc, >>>>>> len(lists), project)) >>>>>> for i in range(1,cc+1): >>>>>> - t = SlurpThread() >>>>>> + t = SlurpThread(i) >>>>>> threads.append(t) >>>>>> t.start() >>>>>> print("Started no. %u" % i) >>>>>> >>>>>> > > > > -- > Francesco Chicchiriccò > > Tirasa - Open Source Excellence > http://www.tirasa.net/ > > Member at The Apache Software Foundation > Syncope, Cocoon, Olingo, CXF, OpenJPA, PonyMail > http://home.apache.org/~ilgrosso/ >
