dheerajturaga commented on code in PR #61155:
URL: https://github.com/apache/airflow/pull/61155#discussion_r2825554932


##########
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py:
##########
@@ -61,56 +58,17 @@ def __init__(self, parallelism: int = PARALLELISM):
         super().__init__(parallelism=parallelism)
         self.last_reported_state: dict[TaskInstanceKey, TaskInstanceState] = {}
 
-    def _check_db_schema(self, engine: Engine) -> None:
-        """
-        Check if already existing table matches the newest table schema.
-
-        workaround as Airflow 2.x had no support for provider DB migrations,
-        then it is possible to use alembic also for provider distributions.
-
-        TODO(jscheffl): Change to alembic DB migrations in the future.
-        """
-        inspector = inspect(engine)
-        edge_job_columns = None
-        edge_job_command_len = None
-        with contextlib.suppress(NoSuchTableError):
-            edge_job_schema = inspector.get_columns("edge_job")
-            edge_job_columns = [column["name"] for column in edge_job_schema]
-            for column in edge_job_schema:
-                if column["name"] == "command":
-                    edge_job_command_len = column["type"].length  # type: 
ignore[attr-defined]
-
-        # version 0.6.0rc1 added new column concurrency_slots
-        if edge_job_columns and "concurrency_slots" not in edge_job_columns:
-            EdgeJobModel.metadata.drop_all(engine, 
tables=[EdgeJobModel.__table__])
-
-        # version 1.1.0 the command column was changed to VARCHAR(2048)
-        elif edge_job_command_len and edge_job_command_len != 2048:
-            with Session(engine) as session:
-                query = "ALTER TABLE edge_job ALTER COLUMN command TYPE 
VARCHAR(2048);"
-                session.execute(text(query))
-                session.commit()
-
-        edge_worker_columns = None
-        with contextlib.suppress(NoSuchTableError):
-            edge_worker_columns = [column["name"] for column in 
inspector.get_columns("edge_worker")]
-
-        # version 0.14.0pre0 added new column maintenance_comment
-        if edge_worker_columns and "maintenance_comment" not in 
edge_worker_columns:
-            with Session(engine) as session:
-                query = "ALTER TABLE edge_worker ADD maintenance_comment 
VARCHAR(1024);"
-                session.execute(text(query))
-                session.commit()
-
     @provide_session
     def start(self, session: Session = NEW_SESSION):
         """If EdgeExecutor provider is loaded first time, ensure table 
exists."""
+        from airflow.providers.edge3.models.db import EdgeDBManager

Review Comment:
   Fixed this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to