bolkedebruin commented on code in PR #29355:
URL: https://github.com/apache/airflow/pull/29355#discussion_r1100495876
##########
airflow/models/taskinstance.py:
##########
@@ -532,6 +560,42 @@ def __init__(
# can be changed when calling 'run'
self.test_mode = False
+ def _init_from_dict(self, ti_dict: dict[str, Any]):
+ self.__dict__ = ti_dict.copy()
+ self.init_on_load()
+
+ @staticmethod
+ def from_task(
+ task: Operator,
+ execution_date: datetime | None = None,
+ run_id: str | None = None,
+ state: str | None = None,
+ map_index: int = -1,
+ ):
+ """
+ Create TaskInstance from task operator.
+
+ :param task: task's Operator object.
+ :param execution_date: Optional execution time of the task.
+ :param run_id: Optional DAG run ID for the task.
+ :param state: Optional state of the task.
+ :param map_index: Optional map index. Defaults to -1 (non-mapped task).
+ """
+ return TaskInstance(task, execution_date, run_id, state, map_index)
+
+ @staticmethod
+ def deserialize(ti_dict: dict[str, Any]) -> TaskInstance:
+ """Create TaskInstance from dictionary."""
+ return TaskInstance(ti_dict=ti_dict)
Review Comment:
Why do this in the constructor? It seems to overload it and only calls
`_init_from_dict` which does a dict copy and calls `init_on_load`? I would
expect something like
```
def deserialize(data: dict[str, Any], version: int) -> TaskInstance:
if version > TaskInstance.__version__:
raise TypeError("version too big, dont know hot to deserialize")
ti = TaskInstance()
ti.__dict__ = data.copy()
ti.init_on_load()
return ti
```
##########
airflow/models/taskinstance.py:
##########
@@ -532,6 +560,42 @@ def __init__(
# can be changed when calling 'run'
self.test_mode = False
+ def _init_from_dict(self, ti_dict: dict[str, Any]):
+ self.__dict__ = ti_dict.copy()
+ self.init_on_load()
+
+ @staticmethod
+ def from_task(
+ task: Operator,
+ execution_date: datetime | None = None,
+ run_id: str | None = None,
+ state: str | None = None,
+ map_index: int = -1,
+ ):
+ """
+ Create TaskInstance from task operator.
+
+ :param task: task's Operator object.
+ :param execution_date: Optional execution time of the task.
+ :param run_id: Optional DAG run ID for the task.
+ :param state: Optional state of the task.
+ :param map_index: Optional map index. Defaults to -1 (non-mapped task).
+ """
+ return TaskInstance(task, execution_date, run_id, state, map_index)
+
+ @staticmethod
+ def deserialize(ti_dict: dict[str, Any]) -> TaskInstance:
Review Comment:
deserialize takes two arguments: data: dict[str, Any] and version: int
##########
airflow/models/taskinstance.py:
##########
@@ -470,13 +469,42 @@ class TaskInstance(Base, LoggingMixin):
Review Comment:
here add a
`__version__: ClassVar[int] = 1`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]