jedcunningham commented on code in PR #50788:
URL: https://github.com/apache/airflow/pull/50788#discussion_r2096414778


##########
airflow-core/src/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py:
##########
@@ -77,9 +77,24 @@ def upgrade():
     condition = condition_templates.get(dialect)
     if not condition:
         raise RuntimeError(f"Unsupported dialect: {dialect}")
-
     # Key is a reserved keyword in MySQL, so we need to quote it
     quoted_key = conn.dialect.identifier_preparer.quote("key")
+    if dialect == "postgresql":
+        curr_timeout = (
+            int(
+                list(  # noqa: RUF015
+                    conn.execute(
+                        text("""select setting
+        from pg_settings
+        where name = 'statement_timeout'""")
+                    )
+                )[0][0]
+            )
+            / 1000
+        )

Review Comment:
   This is a little unwieldy, can it be simplified?
   
   ```suggestion
           curr_timeout = int(
                   conn.execute(
                           text("""select setting from pg_settings where name = 
'statement_timeout'""")
                   ).scalar_one()
           ) / 1000
   ```
   
   Something like this maybe (untested)?



##########
airflow-core/src/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py:
##########
@@ -61,12 +74,20 @@ def upgrade():
             """)
         )
     else:
-        BATCH_SIZE = 100
+        BATCH_SIZE = 1000
         offset = 0
         while True:
+            err_count = 0
+            batch_num = offset + 1
+            print(f"converting dag run conf. batch={batch_num}")
             rows = conn.execute(
                 text(
-                    f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order 
by id LIMIT {BATCH_SIZE} OFFSET {offset}"
+                    "SELECT id, conf "
+                    "FROM dag_run "
+                    "WHERE conf IS not NULL "
+                    f"AND conf != {empty_val}"

Review Comment:
   We can just ignore them vs updating them to `{}`?



##########
airflow-core/docs/img/airflow_erd.sha256:
##########
@@ -1 +1 @@
-4cda55eb221ee0340749c8dd41af7603220d7b300e2e808d733239a86e0c2837
\ No newline at end of file
+80d87cc89c29fd73466a8625f82dfcbff347e9e72fee498bcb61a0acb90bf3a1

Review Comment:
   hmm, bad conflict resolution? Don't think the other changes should change 
this.



##########
airflow-core/src/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py:
##########
@@ -85,14 +106,17 @@ def upgrade():
                                             """),
                         {"json_data": json_data, "id": row_id},
                     )
-                except Exception as e:
-                    print(f"Error converting dagrun conf to json for dagrun ID 
{row_id}: {e}")
+                except Exception:
+                    err_count += 1
                     continue
+            if err_count:
+                print(f"could not convert dag run conf for {err_count} 
records. batch={batch_num}")
             offset += BATCH_SIZE
 
     op.drop_column("dag_run", "conf")
 
     op.alter_column("dag_run", "conf_json", existing_type=conf_type, 
new_column_name="conf")
+    print(f"end: {pendulum.now()}")

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to