andreydevyatkin commented on code in PR #30327:
URL: https://github.com/apache/beam/pull/30327#discussion_r1497324625
##########
.test-infra/metrics/sync/github/sync_workflows.py:
##########
@@ -600,52 +319,86 @@ def append_workflow_runs(workflow, runs):
else:
number_of_runs_to_add =\
int(GH_NUMBER_OF_WORKFLOW_RUNS_TO_FETCH) - len(workflow.runs)
- workflow.runs.extend([(0, 'None', 'None')] * number_of_runs_to_add)
+ workflow.runs.extend(
+ [WorkflowRun(0, "None", "None", workflow.id, "None")] *
number_of_runs_to_add
+ )
if len(workflow.runs) >= int(GH_NUMBER_OF_WORKFLOW_RUNS_TO_FETCH):
workflow_ids_to_fetch_extra_runs.pop(workflow_id, None)
print(f"Successfully fetched extra workflow runs for:
{workflow.filename}")
page += 1
print("Successfully fetched workflow runs details")
for workflow in list(workflows.values()):
- runs = sorted(workflow.runs, key=lambda r: r[0], reverse=True)
+ runs = sorted(workflow.runs, key=lambda r: r.id, reverse=True)
workflow.runs = runs[:int(GH_NUMBER_OF_WORKFLOW_RUNS_TO_FETCH)]
return list(workflows.values())
-def database_operations(connection, workflows):
+
+def save_workflows(workflows):
+ connection = init_db_connection()
# Create the table and update it with the latest workflow runs
if not workflows:
return
cursor = connection.cursor()
workflows_table_name = "github_workflows"
+ workflow_runs_table_name = "github_workflow_runs"
cursor.execute(f"DROP TABLE IF EXISTS {workflows_table_name};")
- create_table_query = f"""
- CREATE TABLE IF NOT EXISTS {workflows_table_name} (
- workflow_id integer NOT NULL PRIMARY KEY,
- job_name text NOT NULL,
- job_yml_filename text NOT NULL,
- dashboard_category text NOT NULL"""
- for i in range(int(GH_NUMBER_OF_WORKFLOW_RUNS_TO_FETCH)):
- create_table_query += f""",
- run{i+1} text,
- run{i+1}Id text"""
- create_table_query += ")\n"
- cursor.execute(create_table_query)
- insert_query = f"INSERT INTO {workflows_table_name} VALUES "
+ cursor.execute(f"DROP TABLE IF EXISTS {workflow_runs_table_name};")
+ create_workflows_table_query = f"""
+ CREATE TABLE IF NOT EXISTS {workflows_table_name} (
+ workflow_id integer NOT NULL PRIMARY KEY,
+ name text NOT NULL,
+ filename text NOT NULL,
+ url text NOT NULL,
+ dashboard_category text NOT NULL,
+ threshold real NOT NULL)\n"""
+ create_workflow_runs_table_query = f"""
+ CREATE TABLE IF NOT EXISTS {workflow_runs_table_name} (
+ run_id text NOT NULL PRIMARY KEY,
+ status text NOT NULL,
+ url text NOT NULL,
+ workflow_id integer NOT NULL FOREIGN KEY,
+ started_at timestamp with time zone NOT NULL)\n"""
+ cursor.execute(create_workflows_table_query)
+ cursor.execute(create_workflow_runs_table_query)
+ insert_workflows_query = f"""
+ INSERT INTO {workflows_table_name} (workflow_id, name, filename, url,
dashboard_category, threshold)
+ VALUES %s"""
+ insert_workflow_runs_query = f"""
+ INSERT INTO {workflow_runs_table_name} (run_id, status, url, workflow_id,
started_at)
+ VALUES %s"""
+ insert_workflows = []
+ insert_workflow_runs = []
for workflow in workflows:
- category = get_dashboard_category(workflow.name)
- row_insert =\
-
f"(\'{workflow.id}\',\'{workflow.name}\',\'{workflow.filename}\',\'{category}\'"
- for _, status, url in workflow.runs:
- row_insert += f",\'{status}\',\'{url}\'"
- insert_query += f"{row_insert}),"
- insert_query = insert_query[:-1] + ";"
- print(insert_query)
- cursor.execute(insert_query)
+ insert_workflows.append(
+ (
+ workflow.id,
+ workflow.name,
+ workflow.filename,
+ workflow.url,
+ workflow.category,
+ workflow.threshold
+ )
+ )
+ for run in workflow.runs:
+ if run.id != 0:
+ started_at = run.started_at.replace("T", " ")
+ insert_workflow_runs.append(
+ (
+ run.id,
+ run.status,
+ run.url,
+ workflow.id,
+ started_at
+ )
+ )
+ psycopg2.extras.execute_values(cursor, insert_workflows_query,
insert_workflows)
+ psycopg2.extras.execute_values(cursor, insert_workflow_runs_query,
insert_workflow_runs)
cursor.close()
connection.commit()
connection.close()
Review Comment:
@damccorm After analyzing the data and the Grafana dashboard schema we came
to the conclusion that we can normalize the DB schema in order to get rid of
data redundancy. In essence, we only need one GCP function/script to fetch the
data and store it to the DB, and the grouping part for displaying test
statistics and alerting can be done using SQL queries.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]