laforge has submitted this change. ( 
https://gerrit.osmocom.org/c/pysim/+/35634?usp=email )

Change subject: Move X.509 related code from osmo-smdpp to pySim.esim.x509_cert
......................................................................

Move X.509 related code from osmo-smdpp to pySim.esim.x509_cert

Change-Id: I230ba0537b702b0bf6e9da9a430908ed2a21ca61
---
M osmo-smdpp.py
M pySim/esim/x509_cert.py
2 files changed, 86 insertions(+), 74 deletions(-)

Approvals:
  laforge: Looks good to me, approved
  Jenkins Builder: Verified




diff --git a/osmo-smdpp.py b/osmo-smdpp.py
index 41cdfc8..7e0db27 100755
--- a/osmo-smdpp.py
+++ b/osmo-smdpp.py
@@ -36,6 +36,7 @@

 import pySim.esim.rsp as rsp
 from pySim.esim.es8p import *
+from pySim.esim.x509_cert import oid, cert_policy_has_oid, CertAndPrivkey

 # HACK: make this configurable
 DATA_DIR = './smdpp-data'
@@ -70,18 +71,12 @@
         js['header']['functionExecutionStatus']['statusCodeData'] = 
status_code_data

 from cryptography.hazmat.primitives.asymmetric.utils import 
decode_dss_signature, encode_dss_signature
-from cryptography.hazmat.primitives.serialization import load_pem_private_key, 
Encoding, PublicFormat, PrivateFormat, NoEncryption
+from cryptography.hazmat.primitives.serialization import Encoding, 
PublicFormat, PrivateFormat, NoEncryption
 from cryptography.hazmat.primitives.asymmetric import ec
 from cryptography.hazmat.primitives import hashes
 from cryptography.exceptions import InvalidSignature
 from cryptography import x509

-
-def ecdsa_dss_to_tr03111(sig: bytes) -> bytes:
-    """convert from DER format to BSI TR-03111; first get long integers; then 
convert those to bytes."""
-    r, s = decode_dss_signature(sig)
-    return r.to_bytes(32, 'big') + s.to_bytes(32, 'big')
-
 def ecdsa_tr03111_to_dss(sig: bytes) -> bytes:
     """convert an ECDSA signature from BSI TR-03111 format to DER: first get 
long integers; then encode those."""
     assert len(sig) == 64
@@ -90,52 +85,6 @@
     return encode_dss_signature(r, s)


-class CertAndPrivkey:
-    """A pair of certificate and private key, as used for ECDSA signing."""
-    def __init__(self, required_policy_oid: Optional[x509.ObjectIdentifier] = 
None,
-                 cert: Optional[x509.Certificate] = None, priv_key = None):
-        self.required_policy_oid = required_policy_oid
-        self.cert = cert
-        self.priv_key = priv_key
-
-    def cert_from_der_file(self, path: str):
-        with open(path, 'rb') as f:
-            cert = x509.load_der_x509_certificate(f.read())
-        if self.required_policy_oid:
-            # verify it is the right type of certificate (id-rspRole-dp-auth, 
id-rspRole-dp-auth-v2, etc.)
-            assert cert_policy_has_oid(cert, self.required_policy_oid)
-        self.cert = cert
-
-    def privkey_from_pem_file(self, path: str, password: Optional[str] = None):
-        with open(path, 'rb') as f:
-            self.priv_key = load_pem_private_key(f.read(), password)
-
-    def ecdsa_sign(self, plaintext: bytes) -> bytes:
-        """Sign some input-data using an ECDSA signature compliant with SGP.22,
-        which internally refers to Global Platform 2.2 Annex E, which in turn 
points
-        to BSI TS-03111 which states "concatengated raw R + S values". """
-        sig = self.priv_key.sign(plaintext, ec.ECDSA(hashes.SHA256()))
-        # convert from DER format to BSI TR-03111; first get long integers; 
then convert those to bytes
-        return ecdsa_dss_to_tr03111(sig)
-
-    def get_authority_key_identifier(self) -> x509.AuthorityKeyIdentifier:
-        """Return the AuthorityKeyIdentifier X.509 extension of the 
certificate."""
-        return list(filter(lambda x: isinstance(x.value, 
x509.AuthorityKeyIdentifier), self.cert.extensions))[0].value
-
-    def get_subject_alt_name(self) -> x509.SubjectAlternativeName:
-        """Return the SubjectAlternativeName X.509 extension of the 
certificate."""
-        return list(filter(lambda x: isinstance(x.value, 
x509.SubjectAlternativeName), self.cert.extensions))[0].value
-
-    def get_cert_as_der(self) -> bytes:
-        """Return certificate encoded as DER."""
-        return self.cert.public_bytes(Encoding.DER)
-
-    def get_curve(self) -> ec.EllipticCurve:
-        return self.cert.public_key().public_numbers().curve
-
-
-
-
 class ApiError(Exception):
     def __init__(self, subject_code: str, reason_code: str, message: 
Optional[str] = None,
                  subject_id: Optional[str] = None):
@@ -147,27 +96,6 @@
         build_resp_header(js, 'Failed', self.status_code)
         return json.dumps(js)

-def cert_policy_has_oid(cert: x509.Certificate, match_oid: 
x509.ObjectIdentifier) -> bool:
-    """Determine if given certificate has a certificatePolicy extension of 
matching OID."""
-    for policy_ext in filter(lambda x: isinstance(x.value, 
x509.CertificatePolicies), cert.extensions):
-        if any(policy.policy_identifier == match_oid for policy in 
policy_ext.value._policies):
-            return True
-    return False
-
-ID_RSP = "2.23.146.1"
-ID_RSP_CERT_OBJECTS = '.'.join([ID_RSP, '2'])
-ID_RSP_ROLE = '.'.join([ID_RSP_CERT_OBJECTS, '1'])
-
-class oid:
-    id_rspRole_ci = x509.ObjectIdentifier(ID_RSP_ROLE + '.0')
-    id_rspRole_euicc_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.1')
-    id_rspRole_eum_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.2')
-    id_rspRole_dp_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.3')
-    id_rspRole_dp_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.4')
-    id_rspRole_dp_pb_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.5')
-    id_rspRole_ds_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.6')
-    id_rspRole_ds_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.7')
-
 class SmDppHttpServer:
     app = Klein()

diff --git a/pySim/esim/x509_cert.py b/pySim/esim/x509_cert.py
index f6c6d43..9f6c8e6 100644
--- a/pySim/esim/x509_cert.py
+++ b/pySim/esim/x509_cert.py
@@ -23,6 +23,7 @@
 from cryptography.hazmat.primitives import hashes
 from cryptography.exceptions import InvalidSignature
 from cryptography import x509
+from cryptography.hazmat.primitives.serialization import load_pem_private_key, 
Encoding

 from pySim.utils import b2h

@@ -43,6 +44,27 @@
     aki_ext = 
cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier).value
     return aki_ext.key_identifier

+def cert_policy_has_oid(cert: x509.Certificate, match_oid: 
x509.ObjectIdentifier) -> bool:
+    """Determine if given certificate has a certificatePolicy extension of 
matching OID."""
+    for policy_ext in filter(lambda x: isinstance(x.value, 
x509.CertificatePolicies), cert.extensions):
+        if any(policy.policy_identifier == match_oid for policy in 
policy_ext.value._policies):
+            return True
+    return False
+
+ID_RSP = "2.23.146.1"
+ID_RSP_CERT_OBJECTS = '.'.join([ID_RSP, '2'])
+ID_RSP_ROLE = '.'.join([ID_RSP_CERT_OBJECTS, '1'])
+
+class oid:
+    id_rspRole_ci = x509.ObjectIdentifier(ID_RSP_ROLE + '.0')
+    id_rspRole_euicc_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.1')
+    id_rspRole_eum_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.2')
+    id_rspRole_dp_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.3')
+    id_rspRole_dp_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.4')
+    id_rspRole_dp_pb_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.5')
+    id_rspRole_ds_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.6')
+    id_rspRole_ds_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.7')
+
 class VerifyError(Exception):
     """An error during certificate verification,"""
     pass
@@ -54,6 +76,8 @@
     def __init__(self, root_cert: x509.Certificate):
         check_signed(root_cert, root_cert)
         # TODO: check other mandatory attributes for CA Cert
+        if not cert_policy_has_oid(root_cert, oid.id_rspRole_ci):
+            raise ValueError("Given root certificate doesn't have rspRole_ci 
OID")
         usage_ext = 
root_cert.extensions.get_extension_for_class(x509.KeyUsage).value
         if not usage_ext.key_cert_sign:
             raise ValueError('Given root certificate key usage does not permit 
signing of certificates')
@@ -135,3 +159,54 @@
             depth += 1
             if depth > max_depth:
                 raise VerifyError('Maximum depth %u exceeded while verifying 
certificate chain' % max_depth)
+
+from cryptography.hazmat.primitives.asymmetric.utils import 
decode_dss_signature
+
+def ecdsa_dss_to_tr03111(sig: bytes) -> bytes:
+    """convert from DER format to BSI TR-03111; first get long integers; then 
convert those to bytes."""
+    r, s = decode_dss_signature(sig)
+    return r.to_bytes(32, 'big') + s.to_bytes(32, 'big')
+
+
+class CertAndPrivkey:
+    """A pair of certificate and private key, as used for ECDSA signing."""
+    def __init__(self, required_policy_oid: Optional[x509.ObjectIdentifier] = 
None,
+                 cert: Optional[x509.Certificate] = None, priv_key = None):
+        self.required_policy_oid = required_policy_oid
+        self.cert = cert
+        self.priv_key = priv_key
+
+    def cert_from_der_file(self, path: str):
+        with open(path, 'rb') as f:
+            cert = x509.load_der_x509_certificate(f.read())
+        if self.required_policy_oid:
+            # verify it is the right type of certificate (id-rspRole-dp-auth, 
id-rspRole-dp-auth-v2, etc.)
+            assert cert_policy_has_oid(cert, self.required_policy_oid)
+        self.cert = cert
+
+    def privkey_from_pem_file(self, path: str, password: Optional[str] = None):
+        with open(path, 'rb') as f:
+            self.priv_key = load_pem_private_key(f.read(), password)
+
+    def ecdsa_sign(self, plaintext: bytes) -> bytes:
+        """Sign some input-data using an ECDSA signature compliant with SGP.22,
+        which internally refers to Global Platform 2.2 Annex E, which in turn 
points
+        to BSI TS-03111 which states "concatengated raw R + S values". """
+        sig = self.priv_key.sign(plaintext, ec.ECDSA(hashes.SHA256()))
+        # convert from DER format to BSI TR-03111; first get long integers; 
then convert those to bytes
+        return ecdsa_dss_to_tr03111(sig)
+
+    def get_authority_key_identifier(self) -> x509.AuthorityKeyIdentifier:
+        """Return the AuthorityKeyIdentifier X.509 extension of the 
certificate."""
+        return list(filter(lambda x: isinstance(x.value, 
x509.AuthorityKeyIdentifier), self.cert.extensions))[0].value
+
+    def get_subject_alt_name(self) -> x509.SubjectAlternativeName:
+        """Return the SubjectAlternativeName X.509 extension of the 
certificate."""
+        return list(filter(lambda x: isinstance(x.value, 
x509.SubjectAlternativeName), self.cert.extensions))[0].value
+
+    def get_cert_as_der(self) -> bytes:
+        """Return certificate encoded as DER."""
+        return self.cert.public_bytes(Encoding.DER)
+
+    def get_curve(self) -> ec.EllipticCurve:
+        return self.cert.public_key().public_numbers().curve

--
To view, visit https://gerrit.osmocom.org/c/pysim/+/35634?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings

Gerrit-Project: pysim
Gerrit-Branch: master
Gerrit-Change-Id: I230ba0537b702b0bf6e9da9a430908ed2a21ca61
Gerrit-Change-Number: 35634
Gerrit-PatchSet: 2
Gerrit-Owner: laforge <[email protected]>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: laforge <[email protected]>
Gerrit-MessageType: merged

Reply via email to