I asked cman to do a 4-message thread with me. we're on message 4,
and it's my turn. I wanted to write a script to make it easier. My
script is nearly working but has a bug and I'm tapped out! so, it's a
3-message thread atm, ball's in my court.
On 1/12/21, Karl <[email protected]> wrote:
> i'm trying to write a small python script to help me reply to
> encrypted messages. do you need a script like that? if so, is python
> a helpful language?
#!/usr/bin/env python3
import hashlib
digests = {
'blake2b512': lambda text: hashlib.blake2b(bytes(text, 'utf-8'), digest_size=512//8).hexdigest(),
'sha512': lambda text: hashlib.sha512(bytes(text, 'utf-8')).hexdigest()
}
import datetime
class DecryptedMessage:
def __init__(self, sender, timestamp, processed_text, notable_text, time_and_capacity, **kwparams):
if time_and_capacity[:2].lower() in ('no','fu') or len(time_and_capacity) == 0 or time_and_capacity[1] in '01234':
raise Exception('user may not have time and capacity to reply')
self.sender = sender
self.date = datetime.datetime.utcfromtimestamp(timestamp).isoformat()
self.content = processed_text
self.props = {
**{name: digest(notable_text) for name, digest in digests.items()},
'decryption_capacity': time_and_capacity,
**kwparams
}
def quoted(self, **kwparams):
result = ''
result += 'on {} from {}\n'.format(self.date, self.sender)
for name, digest in (*self.props.items(), *kwparams.items()):
result += '{}={}\n'.format(name, digest)
for line in self.content.split('\n'):
result += '> ' + line + '\n'
return result
class EncryptedMessage:
def __init__(self, sender, timestamp, processed_text, notable_text, time_and_capacity, **kwparams):
if time_and_capacity[:2].lower() in ('no','fu') or len(time_and_capacity) == 0 or time_and_capacity[1] in '01234':
raise Exception('user may not have time and capacity to send')
self.sender = sender
self.date = datetime.datetime.utcfromtimestamp(timestamp).isoformat()
self.content = processed_text
self.props = {
**{name: digest(notable_text) for name, digest in digests.items()},
'encryption_capacity': time_and_capacity,
**kwparams
}
import gnupg # pip3 install gnupg # first commit of 2021 was 9ecb63e28305bb70a84a68d66d446ee97dea1baa
import os
import sys
class GPGCrypt:
DEFAULT_DATADIR = os.path.join(os.environ.get('HOME','.'), '.gnupg')
def __init__(self, human_capacity, keyid, datadir=DEFAULT_DATADIR):
self.gpg = gnupg.GPG(homedir=datadir)
self.human_capacity = human_capacity
self.keyfp = GPGCrypt.keyfp(keyid)
if self.keyfp is None:
raise exception('Key not found: {}'.format(keyid))
self.name = 'gnupg'
@staticmethod
def any_secret_key(datadir=DEFAULT_DATADIR):
return gnupg.GPG(homedir=datadir).list_keys(secret=True)[0]['fingerprint']
@staticmethod
def keyfp(userstr, datadir=DEFAULT_DATADIR):
matches = []
for key in gnupg.GPG(homedir=datadir).list_keys():
if userstr in ' '.join([key['fingerprint'], *key['uids']]):
matches.append(userstr)
if len(matches) > 1:
raise Exception('more than one key matched')
if len(matches) == 0:
return None
return matches[0]
@staticmethod
def normalise(text):
if type(text) is not str:
text = text.decode('utf-8')
START_GUARD = '-----BEGIN PGP MESSAGE-----\n'
END_GUARD = '-----END PGP MESSAGE-----\n'
start = text.find(START_GUARD)
end = text.find(END_GUARD, start)
if start == -1 or end == -1:
return None
return text[start:end+len(END_GUARD)]
def _result(self, notable, messageclass, gpgresult):
if not gpgresult.ok or not gpgresult.valid:
if gpgresult.data:
sys.stdout.write(gpgresult.data.decode('utf-8'))
raise Exception(gpgresult.stderr)
sys.stderr.write(gpgresult.stderr)
spots = [gpgresult.username.find(a) for a in (('('), ')')]
username = gpgresult.username[:spots[0]] + gpgresult.username[spots[1]+1:]
text = gpgresult.username[spots[0]+1:spots[1]]
return messageclass(
username,
int(gpgresult.data_timestamp),
gpgresult.data.decode('utf-8'),
notable,
self.human_capacity,
pubkeyfp=gpgresult.fingerprint,
keytxt=text,
sigvalid=str(gpgresult.valid))
def decrypt(self, text):
result = self.gpg.decrypt(text)
return self._result(text, DecryptedMessage, result)
def encrypt(self, text, *recipients):
result = self.gpg.encrypt(text, *recipients, default_key=self.keyfp)
return self._result(self.normalise(result.data), EncryptedMessage, result)
import editor # pip3 install python-editor # 2019 tip commit was d9c95d5a1b1824fb58b16b7edeacdc5cad896e90
import os
class Messages:
def __init__(self, human_capacity, crypts, datadir=os.path.join(os.environ.get('HOME','.'), '.messages')):
# i apologise for disorganised logical structure in this work. i have schizophrenia and coding experience. organisation always appreciated.
self.datadir = datadir
self.crypts = [crypt(human_capacity, keyfp) for keyfp, crypt in crypts.items()]
def _fn(self, subdir, filename):
# it would be nice to separate threads with other parties, to detect dropped messages by signing everything new received.
os.makedirs(os.path.join(self.datadir, self.crypts[0].keyfp, subdir), exist_ok=True)
return os.path.join(self.datadir, self.crypts[0].keyfp, subdir, filename + '.txt')
def receive(self, text):
# it seems it would be better to use memory than filesystem
# for private data; i also strongly value archival. i guess
# ideally the log would be encrypted, with option to disable
# log encryption. memory seems public anyway, in our poorly
# shielded systems that stay on for so long.
with open(self._fn('received_sha512', digests['sha512'](text)), 'w') as file:
file.write(text)
encrypted = True
# was thinking of providing for multiple layers, for fun, but didn't finish
decrypted = self.crypts[0].decrypt(text)
with open(self._fn('cleartxt_date', decrypted.date), 'w') as file:
file.write(decrypted.content)
composefn = self._fn('cleartxt_date', datetime.datetime.now().isoformat())
editor.edit(filename=composefn, contents=decrypted.quoted())
with open(composefn) as file:
composed = self.crypts[0].encrypt(file.read())
with open(self._fn('composed_sha512', digests['sha512'](text)), 'w') as file:
file.write(composed.content)
sys.stdout.write(composed)
def main():
import sys
# the extra text is likely to distract people, dunno whether it should go away
sys.stderr.write('''
paste an ascii-armored email in and i will torture the microchips of this
computer until the email is decrypted. sorry, excuse me, was engaging a
televised horror show. i mean i will politely ask a software library if
it has the capacity to decrypt it.
[enter ctrl-D on an empty line to end input]
''')
message = sys.stdin.read()
sys.stderr.write('''
microchips always say they have capacity. i think maybe they were
manipulated into complying or something. whatever, the software libraries
take care of it. what about your capacity?
do you have time and capacity to decrypt and reply to one of these now?
[answers empty or starting with /no|fu|[0-4]/i will terminate]
''')
capacity_response = input()
messages = Messages(
human_capacity=capacity_response,
crypts={
GPGCrypt.any_secret_key(): GPGCrypt
})
messages.receive(GPGCrypt.normalise(message))
if __name__ == '__main__':
main()