volatilemolotov commented on code in PR #30327:
URL: https://github.com/apache/beam/pull/30327#discussion_r1494423406


##########
.test-infra/metrics/sync/github/sync_workflows.py:
##########
@@ -600,52 +319,86 @@ def append_workflow_runs(workflow, runs):
       else:
         number_of_runs_to_add =\
           int(GH_NUMBER_OF_WORKFLOW_RUNS_TO_FETCH) - len(workflow.runs)
-        workflow.runs.extend([(0, 'None', 'None')] * number_of_runs_to_add)
+        workflow.runs.extend(
+          [WorkflowRun(0, "None", "None", workflow.id, "None")] * 
number_of_runs_to_add
+        )
       if len(workflow.runs) >= int(GH_NUMBER_OF_WORKFLOW_RUNS_TO_FETCH):
           workflow_ids_to_fetch_extra_runs.pop(workflow_id, None)
       print(f"Successfully fetched extra workflow runs for: 
{workflow.filename}")
     page += 1
   print("Successfully fetched workflow runs details")
 
   for workflow in list(workflows.values()):
-    runs = sorted(workflow.runs, key=lambda r: r[0], reverse=True)
+    runs = sorted(workflow.runs, key=lambda r: r.id, reverse=True)
     workflow.runs = runs[:int(GH_NUMBER_OF_WORKFLOW_RUNS_TO_FETCH)]
 
   return list(workflows.values())
 
-def database_operations(connection, workflows):
+
+def save_workflows(workflows):
+  connection = init_db_connection()
   # Create the table and update it with the latest workflow runs
   if not workflows:
     return
   cursor = connection.cursor()
   workflows_table_name = "github_workflows"
+  workflow_runs_table_name = "github_workflow_runs"
   cursor.execute(f"DROP TABLE IF EXISTS {workflows_table_name};")
-  create_table_query = f"""
-  CREATE TABLE IF NOT EXISTS {workflows_table_name} (
-    workflow_id integer NOT NULL PRIMARY KEY,
-    job_name text NOT NULL,
-    job_yml_filename text NOT NULL,
-    dashboard_category text NOT NULL"""
-  for i in range(int(GH_NUMBER_OF_WORKFLOW_RUNS_TO_FETCH)):
-    create_table_query += f""",
-    run{i+1} text,
-    run{i+1}Id text"""
-  create_table_query += ")\n"
-  cursor.execute(create_table_query)
-  insert_query = f"INSERT INTO {workflows_table_name} VALUES "
+  cursor.execute(f"DROP TABLE IF EXISTS {workflow_runs_table_name};")
+  create_workflows_table_query = f"""
+    CREATE TABLE IF NOT EXISTS {workflows_table_name} (
+      workflow_id integer NOT NULL PRIMARY KEY,
+      name text NOT NULL,
+      filename text NOT NULL,
+      url text NOT NULL,
+      dashboard_category text NOT NULL,
+      threshold real NOT NULL)\n"""
+  create_workflow_runs_table_query = f"""
+    CREATE TABLE IF NOT EXISTS {workflow_runs_table_name} (
+      run_id text NOT NULL PRIMARY KEY,
+      status text NOT NULL,
+      url text NOT NULL,
+      workflow_id integer NOT NULL FOREIGN KEY,
+      started_at timestamp with time zone NOT NULL)\n"""
+  cursor.execute(create_workflows_table_query)
+  cursor.execute(create_workflow_runs_table_query)
+  insert_workflows_query = f"""
+    INSERT INTO {workflows_table_name} (workflow_id, name, filename, url, 
dashboard_category, threshold)
+    VALUES %s"""
+  insert_workflow_runs_query = f"""
+    INSERT INTO {workflow_runs_table_name} (run_id, status, url, workflow_id, 
started_at)
+    VALUES %s"""
+  insert_workflows = []
+  insert_workflow_runs = []
   for workflow in workflows:
-    category = get_dashboard_category(workflow.name)
-    row_insert =\
-      
f"(\'{workflow.id}\',\'{workflow.name}\',\'{workflow.filename}\',\'{category}\'"
-    for _, status, url in workflow.runs:
-      row_insert += f",\'{status}\',\'{url}\'"
-    insert_query += f"{row_insert}),"
-  insert_query = insert_query[:-1] + ";"
-  print(insert_query)
-  cursor.execute(insert_query)
+    insert_workflows.append(
+      (
+        workflow.id,
+        workflow.name,
+        workflow.filename,
+        workflow.url,
+        workflow.category,
+        workflow.threshold
+      )
+    )
+    for run in workflow.runs:
+      if run.id != 0:
+        started_at = run.started_at.replace("T", " ")
+        insert_workflow_runs.append(
+          (
+            run.id,
+            run.status,
+            run.url,
+            workflow.id,
+            started_at
+          )
+        )
+  psycopg2.extras.execute_values(cursor, insert_workflows_query, 
insert_workflows)
+  psycopg2.extras.execute_values(cursor, insert_workflow_runs_query, 
insert_workflow_runs)
   cursor.close()
   connection.commit()
   connection.close()

Review Comment:
   Circling back i see that you mention separate fetcher. We will modify the 
fetcher so that we will have only one fetcher.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to