ashb commented on a change in pull request #20722:
URL: https://github.com/apache/airflow/pull/20722#discussion_r780129526



##########
File path: airflow/models/dagrun.py
##########
@@ -804,18 +806,42 @@ def verify_integrity(self, session: Session = 
NEW_SESSION):
                 ti.state = State.NONE
             session.merge(ti)
 
-        # check for missing tasks
-        for task in dag.task_dict.values():
-            if task.start_date > self.execution_date and not self.is_backfill:
-                continue
+        def task_filter(task: "BaseOperator"):
+            return task.task_id not in task_ids and (
+                self.is_backfill or task.start_date <= self.execution_date
+            )
+
+        created_counts: Dict[str, int] = Counter()

Review comment:
       Yeah, I had it this way before (hence why Counter in the first place) 
but doing it this way seemed a lot slower. Let me test that again.

##########
File path: airflow/models/dagrun.py
##########
@@ -804,18 +806,42 @@ def verify_integrity(self, session: Session = 
NEW_SESSION):
                 ti.state = State.NONE
             session.merge(ti)
 
-        # check for missing tasks
-        for task in dag.task_dict.values():
-            if task.start_date > self.execution_date and not self.is_backfill:
-                continue
+        def task_filter(task: "BaseOperator"):
+            return task.task_id not in task_ids and (
+                self.is_backfill or task.start_date <= self.execution_date
+            )
+
+        created_counts: Dict[str, int] = Counter()

Review comment:
       I think the slow down wasn't from using Counter, but just having to walk 
the list of 20k items twice. Anyway, defaultdict it is.

##########
File path: airflow/models/taskinstance.py
##########
@@ -477,6 +477,25 @@ def __init__(
         # can be changed when calling 'run'
         self.test_mode = False
 
+    @staticmethod
+    def insert_mapping(run_id: str, task: "BaseOperator") -> dict:
+        """:meta private:"""
+        return {
+            'dag_id': task.dag_id,
+            'task_id': task.task_id,
+            'run_id': run_id,
+            '_try_number': 0,
+            'unixname': getuser(),
+            'queue': task.queue,
+            'pool': task.pool,
+            'pool_slots': task.pool_slots,
+            'priority_weight': task.priority_weight_total,
+            'run_as_user': task.run_as_user,
+            'max_tries': task.retries,
+            'executor_config': task.executor_config,

Review comment:
       Covered by this existing test in test_taskinstances (and I verified that 
it is going via the insert_mappings path too):
   
   ```python
       def test_ti_updates_with_task(self, create_task_instance, session=None):
           """
           test that updating the executor_config propagates to the 
TaskInstance DB
           """
           ti = create_task_instance(
               dag_id='test_run_pooling_task',
               task_id='test_run_pooling_task_op',
               executor_config={'foo': 'bar'},
           )
           dag = ti.task.dag
   
           ti.run(session=session)
           tis = dag.get_task_instances()
           assert {'foo': 'bar'} == tis[0].executor_config
   ```

##########
File path: airflow/utils/platform.py
##########
@@ -63,6 +65,7 @@ def get_airflow_git_version():
     return git_version
 
 
+@cache

Review comment:
       I added this as there is 0 need to call this 20k times when creating a 
dag -- it can't change at runtime.
   
   (Okay, it _technically_ could change the name at runtime, but that is very 
much an edge case, and not to mention that the value created in the scheduler 
doesn't matter, it will be re-overwritten when the Task executes, and that is 
the interesting value anyway

##########
File path: airflow/utils/platform.py
##########
@@ -63,6 +65,7 @@ def get_airflow_git_version():
     return git_version
 
 
+@cache

Review comment:
       I added this as there is 0 need to call this 20k times when creating a 
dag -- it can't change at runtime.
   
   (Okay, the name of the current user _technically_ could change at runtime, 
but that is very much an edge case, and not to mention that the value created 
in the scheduler doesn't matter, it will be re-overwritten when the Task 
executes, and that is the interesting value anyway

##########
File path: airflow/utils/platform.py
##########
@@ -63,6 +65,7 @@ def get_airflow_git_version():
     return git_version
 
 
+@cache

Review comment:
       Scratch that -- I decided to remove the setting of `unixname` attribute 
for TaskInstances when they are created -- it doesn't make sense to have a 
value there until they have been executed onec.

##########
File path: airflow/utils/platform.py
##########
@@ -63,6 +65,7 @@ def get_airflow_git_version():
     return git_version
 
 
+@cache

Review comment:
       Aaand reverted  the "remove setting unixname" as the API schema 
_requires_ a value for unixname, so changing that would be a bigger/separate 
change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to