dexter has uploaded this change for review. (
https://gerrit.osmocom.org/c/pysim/+/41983?usp=email )
Change subject: tests/pySim-smpp2sim_test: add integration test
......................................................................
tests/pySim-smpp2sim_test: add integration test
At the moment pySim.ota codebase is not covered by any of the
integration tests (we have only normal unittests so far). To
increase the test coverage, let's add an integration test that
sends exchanges an RFM OTA-SMS with a real-world card.
However, there is no tool avaliable that can be used as an SMPP
client for pySim-smpp2sim yet. Let's use smpp_ota_apdu2.py on
laforge/ota to develop a tool that we can use to exchange SMS-TPDUs
that contain remote APDU scripts (RFM/RAM).
Finally let's use the tool we have created as a basis to create
an integration test that exchanges an SMS-TPDU with the RFM
application of a sysmoISIM-SJA5 card. The testcase shall pass
when we get the expected response from the card.
Related: OS#6868
Change-Id: If25e38be004cc1c7aeeb130431831377e78fe28d
---
M contrib/jenkins.sh
A contrib/smpp-ota-tool.py
M pySim/ota.py
M requirements.txt
M setup.py
A tests/pySim-smpp2sim_test/pySim-smpp2sim_test.cfg
A tests/pySim-smpp2sim_test/pySim-smpp2sim_test.sh
7 files changed, 409 insertions(+), 4 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/pysim refs/changes/83/41983/1
diff --git a/contrib/jenkins.sh b/contrib/jenkins.sh
index de1d9f8..b95e80a 100755
--- a/contrib/jenkins.sh
+++ b/contrib/jenkins.sh
@@ -42,6 +42,9 @@
# Run pySim-shell integration tests (requires physical cards)
python3 -m unittest discover -v -s ./tests/pySim-shell_test/
+
+ # Run pySim-smpp2sim test
+ tests/pySim-smpp2sim_test/pySim-smpp2sim_test.sh
;;
"distcheck")
virtualenv -p python3 venv --system-site-packages
diff --git a/contrib/smpp-ota-tool.py b/contrib/smpp-ota-tool.py
new file mode 100755
index 0000000..3dc2185
--- /dev/null
+++ b/contrib/smpp-ota-tool.py
@@ -0,0 +1,235 @@
+#!/usr/bin/env python3
+
+# (C) 2026 by sysmocom - s.f.m.c. GmbH
+# All Rights Reserved
+#
+# Author: Harald Welte, Philipp Maier
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import argparse
+import logging
+import smpplib.gsm
+import smpplib.client
+import smpplib.consts
+import time
+from pySim.ota import OtaKeyset, OtaDialectSms, OtaAlgoCrypt, OtaAlgoAuth,
CNTR_REQ, RC_CC_DS, POR_REQ
+from pySim.utils import b2h, h2b, is_hexstr
+from pathlib import Path
+
+logger = logging.getLogger(Path(__file__).stem)
+
+class SmppHandler:
+ client = None
+
+ def __init__(self, host: str, port: int,
+ system_id: str, password: str,
+ ota_keyset: OtaKeyset, spi: dict, tar: bytes):
+ """
+ Initialize connection to SMPP server and set static OTA SMS-TPDU
ciphering parameters
+ Args:
+ host : Hostname or IPv4/IPv6 address of the SMPP server
+ port : TCP Port of the SMPP server
+ system_id: SMPP System-ID used by ESME (client) to bind
+ password: SMPP Password used by ESME (client) to bind
+ ota_keyset: OTA keyset to be used for SMS-TPDU ciphering
+ spi: Security Parameter Indicator (SPI) to be used for
SMS-TPDU ciphering
+ tar: Toolkit Application Reference (TAR) of the targeted card
application
+ """
+
+ # Create and connect SMPP client
+ client = smpplib.client.Client(host, port,
allow_unknown_opt_params=True)
+ client.set_message_sent_handler(self.message_sent_handler)
+ client.set_message_received_handler(self.message_received_handler)
+ client.connect()
+ client.bind_transceiver(system_id=system_id, password=password)
+ self.client = client
+
+ # Setup static OTA parameters
+ self.ota_dialect = OtaDialectSms()
+ self.ota_keyset = ota_keyset
+ self.tar = tar
+ self.spi = spi
+
+ def __del__(self):
+ if self.client:
+ self.client.unbind()
+ self.client.disconnect()
+
+ def message_received_handler(self, pdu):
+ if pdu.short_message:
+ logger.info("SMS-TPDU received: %s", b2h(pdu.short_message))
+ try:
+ dec = self.ota_dialect.decode_resp(self.ota_keyset, self.spi,
pdu.short_message)
+ except ValueError:
+ # Retry to decoding with ciphering disabled (in case the card
has problems to decode the SMS-TDPU
+ # we have sent, the response will contain an unencrypted error
message)
+ spi = self.spi.copy()
+ spi['por_shall_be_ciphered'] = False
+ spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
+ dec = self.ota_dialect.decode_resp(self.ota_keyset, spi,
pdu.short_message)
+ logger.info("SMS-TPDU decoded: %s", dec)
+ self.response = dec
+ return None
+
+ def message_sent_handler(self, pdu):
+ logger.debug("SMS-TPDU sent: pdu_sequence=%s pdu_message_id=%s",
pdu.sequence, pdu.message_id)
+
+ def transceive_sms_tpdu(self, tpdu: bytes, src_addr: str, dest_addr: str,
timeout: int) -> tuple:
+ """
+ Transceive SMS-TPDU. This method sends the SMS-TPDU to the SMPP
server, and waits for a response. The method
+ returns when the response is received.
+
+ Args:
+ tpdu : short message content (plaintext)
+ src_addr : short message source address
+ dest_addr : short message destination address
+ timeout : timeout after which this method should give up
waiting for a response
+ Returns:
+ tuple containing the response (plaintext)
+ """
+
+ logger.info("SMS-TPDU sending: %s...", b2h(tpdu))
+
+ self.client.send_message(
+ # TODO: add parameters to switch source_addr_ton and dest_addr_ton
between SMPP_TON_INTL and SMPP_NPI_ISDN
+ source_addr_ton=smpplib.consts.SMPP_TON_INTL,
+ source_addr=src_addr,
+ dest_addr_ton=smpplib.consts.SMPP_TON_INTL,
+ destination_addr=dest_addr,
+ short_message=tpdu,
+ # TODO: add parameters to set data_coding and esm_class
+ data_coding=smpplib.consts.SMPP_ENCODING_BINARY,
+ esm_class=smpplib.consts.SMPP_GSMFEAT_UDHI,
+ protocol_id=0x7f,
+ # TODO: add parameter to use registered delivery
+ # registered_delivery=True,
+ )
+
+ logger.info("SMS-TPDU sent, waiting for response...")
+ timestamp_sent=int(time.time())
+ self.response = None
+ while self.response is None:
+ self.client.poll()
+ if int(time.time()) - timestamp_sent > timeout:
+ raise ValueError("Timeout reached, no response SMS-TPDU
received!")
+ return self.response
+
+ def transceive_apdu(self, apdu: bytes, src_addr: str, dest_addr: str,
timeout: int) -> tuple[bytes, bytes]:
+ """
+ Transceive APDU. This method wraps the given APDU into an SMS-TPDU,
sends it to the SMPP server and waits for
+ the response. When the response is received, the last response data
and the last status word is extracted from
+ the response and returned to the caller.
+
+ Args:
+ apdu : one or more concatenated APDUs
+ src_addr : short message source address
+ dest_addr : short message destination address
+ timeout : timeout after which this method should give up
waiting for a response
+ Returns:
+ tuple containing the last response data and the last status
word as byte strings
+ """
+
+ logger.info("C-APDU sending: %s..." % b2h(apdu))
+
+ # translate to Secured OTA RFM
+ secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar,
self.spi, apdu=apdu)
+ # add user data header
+ tpdu = b'\x02\x70\x00' + secured
+ # send via SMPP
+ response = self.transceive_sms_tpdu(tpdu, src_addr, dest_addr,
timeout)
+
+ # Extract last_response_data and last_status_word from the response
+ sw = None
+ resp = None
+ for container in response:
+ if container:
+ container_dict = dict(container)
+ resp = container_dict.get('last_response_data')
+ sw = container_dict.get('last_status_word')
+ if resp is None:
+ raise ValueError("Response does not contain any
last_response_data, no R-APDU received!")
+ if sw is None:
+ raise ValueError("Response does not contain any last_status_word,
no R-APDU received!")
+
+ logger.info("R-APDU received: %s %s", resp, sw)
+ return h2b(resp), h2b(sw)
+
+if __name__ == '__main__':
+ option_parser = argparse.ArgumentParser(description='CSV importer for
pySim-shell\'s PostgreSQL Card Key Provider',
+
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ option_parser.add_argument("--host", help="Host/IP of the SMPP server",
default="localhost")
+ option_parser.add_argument("--port", help="TCP port of the SMPP server",
default=2775, type=int)
+ option_parser.add_argument("--system-id", help="System ID to use to bind
to the SMPP server", default="test")
+ option_parser.add_argument("--password", help="Password to use to bind to
the SMPP server", default="test")
+ option_parser.add_argument("--verbose", help="Enable verbose logging",
action='store_true', default=False)
+ algo_crypt_choices = []
+ algo_crypt_classes = OtaAlgoCrypt.__subclasses__()
+ for cls in algo_crypt_classes:
+ algo_crypt_choices.append(cls.enum_name)
+ option_parser.add_argument("--algo-crypt", choices=algo_crypt_choices,
default='triple_des_cbc2',
+ help="OTA crypt algorithm")
+ algo_auth_choices = []
+ algo_auth_classes = OtaAlgoAuth.__subclasses__()
+ for cls in algo_auth_classes:
+ algo_auth_choices.append(cls.enum_name)
+ option_parser.add_argument("--algo-auth", choices=algo_auth_choices,
default='triple_des_cbc2',
+ help="OTA auth algorithm")
+ option_parser.add_argument('--kic', required=True, type=is_hexstr,
help='OTA key (KIC)')
+ option_parser.add_argument('--kic_idx', default=1, type=int, help='OTA key
index (KIC)')
+ option_parser.add_argument('--kid', required=True, type=is_hexstr,
help='OTA key (KID)')
+ option_parser.add_argument('--kid_idx', default=1, type=int, help='OTA key
index (KID)')
+ option_parser.add_argument('--cntr', default=0, type=int, help='replay
protection counter')
+ option_parser.add_argument('--tar', required=True, type=is_hexstr,
help='Toolkit Application Reference')
+ option_parser.add_argument("--cntr_req",
choices=CNTR_REQ.decmapping.values(), default='no_counter',
+ help="Counter requirement")
+ option_parser.add_argument('--ciphering', default=True, type=bool,
help='Enable ciphering')
+ option_parser.add_argument("--rc-cc-ds",
choices=RC_CC_DS.decmapping.values(), default='cc',
+ help="message check (rc=redundency check,
cc=crypt. checksum, ds=digital signature)")
+ option_parser.add_argument('--por-in-submit', default=False, type=bool,
+ help='require PoR to be sent via SMS-SUBMIT')
+ option_parser.add_argument('--por-shall-be-ciphered', default=True,
type=bool, help='require encrypted PoR')
+ option_parser.add_argument("--por-rc-cc-ds",
choices=RC_CC_DS.decmapping.values(), default='cc',
+ help="PoR check (rc=redundency check, cc=crypt.
checksum, ds=digital signature)")
+ option_parser.add_argument("--por_req",
choices=POR_REQ.decmapping.values(), default='por_required',
+ help="Proof of Receipt requirements")
+ option_parser.add_argument('--src-addr', default='12', type=str,
help='TODO')
+ option_parser.add_argument('--dest-addr', default='23', type=str,
help='TODO')
+ option_parser.add_argument('--timeout', default=10, type=int, help='TODO')
+ option_parser.add_argument('-a', '--apdu', action='append', required=True,
type=is_hexstr, help='C-APDU to send')
+ opts = option_parser.parse_args()
+
+ logging.basicConfig(level=logging.DEBUG if opts.verbose else logging.INFO,
+ format='%(asctime)s %(levelname)s %(message)s',
+ datefmt='%Y-%m-%d %H:%M:%S')
+
+ ota_keyset = OtaKeyset(algo_crypt=opts.algo_crypt,
+ kic_idx=opts.kic_idx,
+ kic=h2b(opts.kic),
+ algo_auth=opts.algo_auth,
+ kid_idx=opts.kic_idx,
+ kid=h2b(opts.kid),
+ cntr=opts.cntr)
+ spi = {'counter' : opts.cntr_req,
+ 'ciphering' : opts.ciphering,
+ 'rc_cc_ds': opts.rc_cc_ds,
+ 'por_in_submit':opts.por_in_submit,
+ 'por_shall_be_ciphered':opts.por_shall_be_ciphered,
+ 'por_rc_cc_ds': opts.por_rc_cc_ds,
+ 'por': opts.por_req}
+ apdu = h2b("".join(opts.apdu))
+
+ smpp_handler = SmppHandler(opts.host, opts.port, opts.system_id,
opts.password, ota_keyset, spi, h2b(opts.tar))
+ resp, sw = smpp_handler.transceive_apdu(apdu, opts.src_addr,
opts.dest_addr, opts.timeout)
+ print("%s %s" % (b2h(resp), b2h(sw)))
diff --git a/pySim/ota.py b/pySim/ota.py
index a6b563d..fbb9d45 100644
--- a/pySim/ota.py
+++ b/pySim/ota.py
@@ -57,12 +57,13 @@
'last_response_data'/HexAdapter(GreedyBytes))
RC_CC_DS = Enum(BitsInteger(2), no_rc_cc_ds=0, rc=1, cc=2, ds=3)
+CNTR_REQ = Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1,
counter_must_be_higher=2, counter_must_be_lower=3)
+POR_REQ = Enum(BitsInteger(2), no_por=0, por_required=1, por_only_when_error=2)
# TS 102 225 Section 5.1.1 + TS 31.115 Section 4.2
SPI = BitStruct( # first octet
Padding(3),
- 'counter'/Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1,
- counter_must_be_higher=2, counter_must_be_lower=3),
+ 'counter'/CNTR_REQ,
'ciphering'/Flag,
'rc_cc_ds'/RC_CC_DS,
# second octet
@@ -70,8 +71,7 @@
'por_in_submit'/Flag,
'por_shall_be_ciphered'/Flag,
'por_rc_cc_ds'/RC_CC_DS,
- 'por'/Enum(BitsInteger(2), no_por=0,
- por_required=1, por_only_when_error=2)
+ 'por'/POR_REQ
)
# TS 102 225 Section 5.1.2
diff --git a/requirements.txt b/requirements.txt
index 2ffd999..4ceec45 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -15,3 +15,4 @@
packaging
git+https://github.com/hologram-io/smpp.pdu
smpp.twisted3 @ git+https://github.com/jookies/smpp.twisted
+smpplib
diff --git a/setup.py b/setup.py
index dffff5b..be81122 100644
--- a/setup.py
+++ b/setup.py
@@ -55,6 +55,7 @@
"service-identity",
"pyopenssl",
"requests",
+ "smpplib",
],
"CardKeyProviderPgsql": [
"psycopg2-binary",
diff --git a/tests/pySim-smpp2sim_test/pySim-smpp2sim_test.cfg
b/tests/pySim-smpp2sim_test/pySim-smpp2sim_test.cfg
new file mode 100644
index 0000000..1c2a953
--- /dev/null
+++ b/tests/pySim-smpp2sim_test/pySim-smpp2sim_test.cfg
@@ -0,0 +1,9 @@
+# Card parameter:
+ICCID="8949440000001155314"
+KIC='51D4FC44BCBA7C4589DFADA3297720AF'
+KID='0449699C472CE71E2FB7B56245EF7684'
+
+# Testcase: Send OTA-SMS that selects DF.GSM and returns the select response
+TAR='B00010'
+APDU='A0A40000027F20A0C0000016'
+EXPECTED_RESPONSE='0000ffff7f2002000000000009b106350400838a838a 9000'
\ No newline at end of file
diff --git a/tests/pySim-smpp2sim_test/pySim-smpp2sim_test.sh
b/tests/pySim-smpp2sim_test/pySim-smpp2sim_test.sh
new file mode 100755
index 0000000..2ca0a9e
--- /dev/null
+++ b/tests/pySim-smpp2sim_test/pySim-smpp2sim_test.sh
@@ -0,0 +1,156 @@
+#!/bin/bash
+
+# Utility to verify the functionality of pySim-trace.py
+#
+# (C) 2026 by sysmocom - s.f.m.c. GmbH
+# All Rights Reserved
+#
+# Author: Philipp Maier
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+PYSIM_SMPP2SIM=./pySim-smpp2sim.py
+PYSIM_SMPP2SIM_LOG=./pySim-smpp2sim.log
+PYSIM_SMPP2SIM_PORT=2775
+PYSIM_SMPP2SIM_TIMEOUT=10
+PYSIM_SMPPOTATOOL=./contrib/smpp-ota-tool.py
+PYSIM_SMPPOTATOOL_LOG=./smpp-ota-tool.log
+PYSIM_SHELL=./pySim-shell.py
+
+function dump_logs {
+ echo ""
+ echo "$PYSIM_SMPPOTATOOL_LOG"
+ echo "------------8<------------"
+ cat $PYSIM_SMPPOTATOOL_LOG
+ echo "------------8<------------"
+ echo ""
+ echo "$PYSIM_SMPP2SIM_LOG"
+ echo "------------8<------------"
+ cat $PYSIM_SMPP2SIM_LOG
+ echo "------------8<------------"
+}
+
+function send_test_request {
+ echo ""
+ echo "Sending request to SMPP server:"
+ TAR=$1
+ C_APDU=$2
+ R_APDU_EXPECTED=$3
+
+ echo "Sending: $C_APDU"
+ COMMANDLINE="$PYSIM_SMPPOTATOOL --verbose --port $PYSIM_SMPP2SIM_PORT
--kic $KIC --kid $KID --tar $TAR --apdu $C_APDU"
+ echo "Commandline: $COMMANDLINE"
+ R_APDU=`$COMMANDLINE 2> $PYSIM_SMPPOTATOOL_LOG`
+ if [ $? -ne 0 ]; then
+ echo "Unable to send request! -- failed!"
+ dump_logs
+ exit 1
+ fi
+
+ echo "Got response from SMPP server:"
+ echo "Sent: $C_APDU"
+ echo "Received: $R_APDU"
+ echo "Expected: $R_APDU_EXPECTED"
+ if [ "$R_APDU" != "$R_APDU_EXPECTED" ]; then
+ echo "Response does not match the expected response! -- failed!"
+ dump_logs
+ exit 1
+ fi
+ echo "Response matches the expected response -- success!"
+ echo ""
+}
+
+function start_smpp_server {
+ PCSC_READER=$1
+
+ # Start the SMPP server
+ echo ""
+ echo "Starting SMPP server:"
+
+ COMMANDLINE="$PYSIM_SMPP2SIM -p $PCSC_READER --smpp-bind-port
$PYSIM_SMPP2SIM_PORT --apdu-trace"
+ echo "Commandline: $COMMANDLINE"
+ $COMMANDLINE > $PYSIM_SMPP2SIM_LOG 2>&1 &
+ PYSIM_SMPP2SIM_PID=$!
+ trap 'kill $PYSIM_SMPP2SIM_PID' EXIT
+ echo "SMPP server started (PID=$PYSIM_SMPP2SIM_PID)"
+
+ # Wait until the SMPP server is reachable
+ RC=1
+ RETRY_COUNT=0
+ while [ $RC -ne 0 ]; do
+ nc -z localhost $PYSIM_SMPP2SIM_PORT
+ RC=$?
+ ((RETRY_COUNT++))
+ if [ $RETRY_COUNT -gt $PYSIM_SMPP2SIM_TIMEOUT ]; then
+ echo "SMPP server not reachable (port=$PYSIM_SMPP2SIM_PORT) --
abort"
+ dump_logs
+ exit 1
+ fi
+ sleep 1
+ done
+ echo "SMPP server reachable (port=$PYSIM_SMPP2SIM_PORT)"
+}
+
+function find_card_by_iccid {
+ # Find reader number of the card
+ ICCID=$1
+
+ echo ""
+ echo "Searching for card:"
+ echo "ICCID: \"$ICCID\""
+
+ if [ -z "$ICCID" ]; then
+ echo "invalid ICCID, zero length ICCID is not allowed! -- abort"
+ exit 1
+ fi
+
+ PCSC_READER_COUNT=`pcsc_scan -rn | wc -l`
+ for PCSC_READER in $(seq 0 $(($PCSC_READER_COUNT-1))); do
+ echo "probing card in reader $PCSC_READER ..."
+ EF_ICCID_DECODED=`$PYSIM_SHELL -p $PCSC_READER --noprompt -e 'select
EF.ICCID' -e 'read_binary_decoded --oneline' 2> /dev/null | tail -1`
+ echo $EF_ICCID_DECODED | grep $ICCID > /dev/null
+ if [ $? -eq 0 ]; then
+ echo "Found card in reader $PCSC_READER"
+ return $PCSC_READER
+ fi
+ done
+
+ echo "Card with ICCID \"$ICCID\" not found -- abort"
+ exit 1
+}
+
+export PYTHONPATH=./
+
+echo "pySim-smpp2sim_test - a test program to test pySim-smpp2sim.py"
+echo "=============================================================="
+
+# TODO: At the moment we can only have one card and one testcase. This is
+# sufficient for now. We can extend this later as needed.
+
+# Read test parameters from config from file
+TEST_CONFIG_FILE=${0%.*}.cfg
+echo "using config file: $TEST_CONFIG_FILE"
+if ! [ -e "$TEST_CONFIG_FILE" ]; then
+ echo "test configuration file does not exist! -- abort"
+ exit 1
+fi
+. $TEST_CONFIG_FILE
+
+# Execute testcase
+find_card_by_iccid $ICCID
+start_smpp_server $?
+send_test_request $TAR $APDU "$EXPECTED_RESPONSE"
+
+
+
--
To view, visit https://gerrit.osmocom.org/c/pysim/+/41983?usp=email
To unsubscribe, or for help writing mail filters, visit
https://gerrit.osmocom.org/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: pysim
Gerrit-Branch: master
Gerrit-Change-Id: If25e38be004cc1c7aeeb130431831377e78fe28d
Gerrit-Change-Number: 41983
Gerrit-PatchSet: 1
Gerrit-Owner: dexter <[email protected]>