Tim Andersson has proposed merging ~andersson123/autopkgtest-cloud:sqlite-writer-bugfixes into autopkgtest-cloud:master.
Requested reviews: Canonical's Ubuntu QA (canonical-ubuntu-qa) For more details, see: https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/462218 -- Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:sqlite-writer-bugfixes into autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-web/webcontrol/download-all-results b/charms/focal/autopkgtest-web/webcontrol/download-all-results index 0c0e569..457fcd5 100755 --- a/charms/focal/autopkgtest-web/webcontrol/download-all-results +++ b/charms/focal/autopkgtest-web/webcontrol/download-all-results @@ -21,6 +21,7 @@ import tarfile import urllib.parse import amqplib.client_0_8 as amqp +from amqplib.client_0_8 import AMQPException import swiftclient from distro_info import UbuntuDistroInfo from helpers.utils import SqliteWriterConfig, get_test_id @@ -50,7 +51,6 @@ def amqp_connect(): return amqp_con -# def list_remote_container(container_url): def list_remote_container(container_name, swift_conn): LOGGER.debug("Listing container %s", container_name) _, list_of_test_results = swift_conn.get_container( @@ -78,7 +78,6 @@ def list_our_results(release): def fetch_one_result(container_name, object_name, swift_conn): """Download one result URL from swift and add it to the DB""" (release, arch, _, src, run_id, _) = object_name.split("/") - test_id = get_test_id(db_con, release, arch, src) _, contents = swift_conn.get_object(container_name, object_name) tar_bytes = io.BytesIO(contents) try: @@ -142,32 +141,42 @@ def fetch_one_result(container_name, object_name, swift_conn): env_vars.append("=".join([env, value])) # Insert the write request into the queue - complete_amqp = amqp_con.channel() - complete_amqp.access_request( - "/complete", active=True, read=False, write=True - ) - complete_amqp.exchange_declare( - SqliteWriterConfig.writer_exchange_name, - "fanout", - durable=True, - auto_delete=False, - ) - write_me_msg = { - "test_id": test_id, - "run_id": run_id, - "version": ver, - "triggers": test_triggers, - "duration": duration, - "exitcode": exitcode, - "requester": requester, - "env": ",".join(env_vars), - "uuid": test_uuid, - } - complete_amqp.basic_publish( - amqp.Message(json.dumps(write_me_msg), delivery_mode=2), - SqliteWriterConfig.writer_exchange_name, - "", - ) + while True: + try: + complete_amqp = amqp_con.channel() + complete_amqp.access_request( + "/complete", active=True, read=False, write=True + ) + complete_amqp.exchange_declare( + SqliteWriterConfig.writer_exchange_name, + "fanout", + durable=True, + auto_delete=False, + ) + write_me_msg = { + "run_id": run_id, + "version": ver, + "triggers": test_triggers, + "duration": duration, + "exitcode": exitcode, + "requester": requester, + "env": ",".join(env_vars), + "uuid": test_uuid, + "release": release, + "arch": arch, + "package": src, + } + complete_amqp.basic_publish( + amqp.Message(json.dumps(write_me_msg), delivery_mode=2), + SqliteWriterConfig.writer_exchange_name, + "", + ) + return + except Exception as _: + amqp_con = amqp_connect() + # maybe we should sleep here? + # or re-init the amqp_con + pass def fetch_container(release, swift_conn): @@ -246,9 +255,7 @@ if __name__ == "__main__": for release in releases: fetch_container( release, - os.path.join( - config["web"]["SwiftURL"], f"autopkgtest-{release}" - ), + swift_conn, ) finally: if db_con: diff --git a/charms/focal/autopkgtest-web/webcontrol/download-results b/charms/focal/autopkgtest-web/webcontrol/download-results index e744eff..14d522c 100755 --- a/charms/focal/autopkgtest-web/webcontrol/download-results +++ b/charms/focal/autopkgtest-web/webcontrol/download-results @@ -9,7 +9,7 @@ import sqlite3 import urllib.parse import amqplib.client_0_8 as amqp -from helpers.utils import SqliteWriterConfig, get_test_id +from helpers.utils import SqliteWriterConfig EXCHANGE_NAME = "testcomplete.fanout" @@ -81,36 +81,43 @@ def process_message(msg, db_con): msg.channel.basic_ack(msg.delivery_tag) return - test_id = get_test_id(db_con, release, arch, package) - # add to queue instead of writing to db - complete_amqp = amqp_con.channel() - complete_amqp.access_request( - "/complete", active=True, read=False, write=True - ) - complete_amqp.exchange_declare( - SqliteWriterConfig.writer_exchange_name, - "fanout", - durable=True, - auto_delete=False, - ) - write_me_msg = { - "test_id": test_id, - "run_id": run_id, - "version": version, - "triggers": triggers, - "duration": duration, - "exitcode": exitcode, - "requester": requester, - "env": info.get("env", ""), - "uuid": test_uuid, - } - complete_amqp.basic_publish( - amqp.Message(json.dumps(write_me_msg), delivery_mode=2), - SqliteWriterConfig.writer_exchange_name, - "", - ) + while True: + try: + # add to queue instead of writing to db + complete_amqp = amqp_con.channel() + complete_amqp.access_request( + "/complete", active=True, read=False, write=True + ) + complete_amqp.exchange_declare( + SqliteWriterConfig.writer_exchange_name, + "fanout", + durable=True, + auto_delete=False, + ) + write_me_msg = { + "run_id": run_id, + "version": version, + "triggers": triggers, + "duration": duration, + "exitcode": exitcode, + "requester": requester, + "env": info.get("env", ""), + "uuid": test_uuid, + "release": release, + "arch": arch, + "package": package, + } + complete_amqp.basic_publish( + amqp.Message(json.dumps(write_me_msg), delivery_mode=2), + SqliteWriterConfig.writer_exchange_name, + "", + ) + + msg.channel.basic_ack(msg.delivery_tag) + return + except Exception as _: + amqp_con = amqp_connect() - msg.channel.basic_ack(msg.delivery_tag) if __name__ == "__main__": diff --git a/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py b/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py index 4306a79..3deca9a 100644 --- a/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py +++ b/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py @@ -19,7 +19,6 @@ class SqliteWriterConfig: writer_exchange_name = "sqlite-write-me.fanout" checkpoint_interval = 5 # minutes test_column_names = [ - "test_id", "run_id", "version", "triggers", @@ -28,6 +27,9 @@ class SqliteWriterConfig: "requester", "env", "uuid", + "release", + "arch", + "package", ] diff --git a/charms/focal/autopkgtest-web/webcontrol/sqlite-writer b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer index b0046db..a08d459 100755 --- a/charms/focal/autopkgtest-web/webcontrol/sqlite-writer +++ b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer @@ -13,7 +13,7 @@ sqlite3.paramstyle = "named" import urllib.parse import amqplib.client_0_8 as amqp -from helpers.utils import SqliteWriterConfig, init_db +from helpers.utils import SqliteWriterConfig, init_db, get_test_id LAST_CHECKPOINT = datetime.datetime.now() @@ -72,13 +72,19 @@ def process_message(msg, db_con): if isinstance(body, bytes): body = body.decode("UTF-8", errors="replace") info = json.loads(body) - logging.info("Message is: \n%s" % json.dumps(info, indent=2)) + # need to make it so the test_id isn't retrieved by download-results or download-all-results, but here + logging.info("Message is: \n%s" % json.dumps(info)) if not check_msg(info): logging.error( "Message has incorrect keys! Ignoring\n%s" % json.dumps(info, indent=2) ) return + # these aren't currently in the messages! + info["test_id"] = get_test_id(db_con, info["release"], info["arch"], info["package"]) + del info["release"] + del info["arch"] + del info["package"] with db_con: c = db_con.cursor() c.execute( @@ -89,7 +95,7 @@ def process_message(msg, db_con): ), info, ) - logging.info("Inserted the following entry into the db:\n%s" % body) + logging.info("Inserted the following entry into the db:\n%s" % json.dumps(info)) def msg_callback(msg, db_con):
-- Mailing list: https://launchpad.net/~canonical-ubuntu-qa Post to : canonical-ubuntu-qa@lists.launchpad.net Unsubscribe : https://launchpad.net/~canonical-ubuntu-qa More help : https://help.launchpad.net/ListHelp