Repository: incubator-ponymail
Updated Branches:
refs/heads/master 1a3bff403 -> af1544e7b
import-mbox.py messages need the thread number
This fixes #248
Project: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-ponymail/commit/af1544e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/tree/af1544e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ponymail/diff/af1544e7
Branch: refs/heads/master
Commit: af1544e7b63d81a5998a4b3a1471586d63d72a4e
Parents: 1a3bff4
Author: Sebb <[email protected]>
Authored: Sun Nov 20 23:19:55 2016 +0000
Committer: Sebb <[email protected]>
Committed: Sun Nov 20 23:19:55 2016 +0000
----------------------------------------------------------------------
tools/import-mbox.py | 59 +++++++++++++++++++++++++++--------------------
1 file changed, 34 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ponymail/blob/af1544e7/tools/import-mbox.py
----------------------------------------------------------------------
diff --git a/tools/import-mbox.py b/tools/import-mbox.py
index 15f09ad..12bc0d1 100755
--- a/tools/import-mbox.py
+++ b/tools/import-mbox.py
@@ -107,7 +107,9 @@ es = Elasticsearch([
rootURL = ""
class BulkThread(Thread):
- def assign(self, json, xes, dtype = 'mbox', wc = 'quorum'):
+
+ def assign(self, id, json, xes, dtype = 'mbox', wc = 'quorum'):
+ self.id = id
self.json = json
self.xes = xes
self.dtype = dtype
@@ -133,17 +135,24 @@ class BulkThread(Thread):
try:
helpers.bulk(self.xes, js_arr)
except Exception as err:
- print("Warning: Could not bulk insert: %s" % err)
- #print("Inserted %u entries" % len(js_arr))
+ print("%d: Warning: Could not bulk insert: %s into %s" %
(self.id,err,self.dtype))
+# print("%d: Inserted %u entries into %s" % (self.id,
len(js_arr),self.dtype))
class SlurpThread(Thread):
+ def __init__(self, index):
+ self.id = index
+ super(SlurpThread, self).__init__()
+
+ def printid(self,message):
+ print("%d: %s" % (self.id, message))
+
def run(self):
global block, y, es, lists, baddies, config, resendTo, timeout,
dedupped, dedup
ja = []
jas = []
- print("Thread started")
+ self.printid("Thread started")
mla = None
ml = ""
mboxfile = ""
@@ -152,16 +161,16 @@ class SlurpThread(Thread):
archie = archiver.Archiver(parseHTML = parseHTML)
while len(lists) > 0:
- print("%u elements left to slurp" % len(lists))
+ self.printid("%u elements left to slurp" % len(lists))
block.acquire()
try:
mla = lists.pop(0)
if not mla:
- print("Nothing more to do here")
+ self.printid("Nothing more to do here")
return
except Exception as err:
- print("Could not pop list: %s" % err)
+ self.printid("Could not pop list: %s" % err)
return
finally:
block.release()
@@ -184,7 +193,7 @@ class SlurpThread(Thread):
tmpname = mla[0]
filename = mla[0]
if filename.find(".gz") != -1:
- print("Decompressing %s..." % filename)
+ self.printid("Decompressing %s..." % filename)
try:
with open(filename, "rb") as bf:
bmd = bf.read()
@@ -197,16 +206,16 @@ class SlurpThread(Thread):
tmpname = tmpfile.name
filename = tmpname
dFile = True # Slated for deletion upon having
been read
- print("%s -> %u bytes" % (tmpname, len(bmd)))
+ self.printid("%s -> %u bytes" % (tmpname,
len(bmd)))
except Exception as err:
- print("This wasn't a gzip file: %s" % err )
- print("Slurping %s" % filename)
+ self.printid("This wasn't a gzip file: %s" % err )
+ self.printid("Slurping %s" % filename)
messages = mailbox.mbox(tmpname)
else:
ml = mla[0]
mboxfile = mla[1]
- print("Slurping %s/%s" % (ml, mboxfile))
+ self.printid("Slurping %s/%s" % (ml, mboxfile))
m = re.match(r"(\d\d\d\d)(\d\d)", mboxfile)
EY = 1997
EM = 1
@@ -232,7 +241,7 @@ class SlurpThread(Thread):
if fromFilter and 'from' in message and
message['from'].find(fromFilter) == -1:
continue
if resendTo:
- print("Delivering message %s via MTA" %
message['message-id'] if 'message-id' in message else '??')
+ self.printid("Delivering message %s via MTA" %
message['message-id'] if 'message-id' in message else '??')
s = SMTP('localhost')
try:
if list_override:
@@ -245,7 +254,7 @@ class SlurpThread(Thread):
s.send_message(message, from_addr=None,
to_addrs=(resendTo))
continue
if (time.time() - stime > timeout): # break out after N
seconds, it shouldn't take this long..!
- print("Whoa, this is taking way too long, ignoring %s for
now" % tmpname)
+ self.printid("Whoa, this is taking way too long, ignoring %s
for now" % tmpname)
break
json, contents = archie.compute_updates(list_override,
private, message)
@@ -271,7 +280,7 @@ class SlurpThread(Thread):
}
)
if res and len(res['hits']['hits']) > 0:
- print("Dedupping %s" % json['message-id'])
+ self.printid("Dedupping %s" % json['message-id'])
dedupped += 1
continue
@@ -305,43 +314,43 @@ class SlurpThread(Thread):
if len(ja) >= 40:
if not args.dry:
bulk = BulkThread()
- bulk.assign(ja, es, 'mbox')
+ bulk.assign(self.id, ja, es, 'mbox')
bulk.insert()
ja = []
if not args.dry:
bulks = BulkThread()
- bulks.assign(jas, es, 'mbox_source')
+ bulks.assign(self.id, jas, es, 'mbox_source')
bulks.insert()
jas = []
else:
- print("Failed to parse: Return=%s Message-Id=%s" %
(message.get('Return-Path'), message.get('Message-Id')))
+ self.printid("Failed to parse: Return=%s Message-Id=%s" %
(message.get('Return-Path'), message.get('Message-Id')))
bad += 1
if filebased:
- print("Parsed %u records (failed: %u) from %s" % (count, bad,
filename))
+ self.printid("Parsed %u records (failed: %u) from %s" %
(count, bad, filename))
if dFile:
os.unlink(tmpname)
elif imap:
- print("Parsed %u records (failed: %u) from imap" % (count,
bad))
+ self.printid("Parsed %u records (failed: %u) from imap" %
(count, bad))
else:
- print("Parsed %s/%s: %u records (failed: %u) from %s" % (ml,
mboxfile, count, bad, tmpname))
+ self.printid("Parsed %s/%s: %u records (failed: %u) from %s" %
(ml, mboxfile, count, bad, tmpname))
os.unlink(tmpname)
y += count
baddies += bad
if not args.dry:
bulk = BulkThread()
- bulk.assign(ja, es, 'mbox')
+ bulk.assign(self.id, ja, es, 'mbox')
bulk.insert()
ja = []
if not args.dry:
bulks = BulkThread()
- bulks.assign(jas, es, 'mbox_source')
+ bulks.assign(self.id, jas, es, 'mbox_source')
bulks.insert()
jas = []
- print("Done, %u elements left to slurp" % len(lists))
+ self.printid("Done, %u elements left to slurp" % len(lists))
parser = argparse.ArgumentParser(description='Command line options.')
parser.add_argument('--source', dest='source', type=str, nargs=1,
@@ -637,7 +646,7 @@ threads = []
cc = min(len(lists), int( multiprocessing.cpu_count() / 2) + 1)
print("Starting up to %u threads to fetch the %u %s lists" % (cc, len(lists),
project))
for i in range(1,cc+1):
- t = SlurpThread()
+ t = SlurpThread(i)
threads.append(t)
t.start()
print("Started no. %u" % i)