reubenvanammers commented on a change in pull request #4156: [AIRFLOW-3314] 
Changed auto inlets feature to work as described
URL: https://github.com/apache/incubator-airflow/pull/4156#discussion_r236112185
 
 

 ##########
 File path: airflow/lineage/__init__.py
 ##########
 @@ -110,26 +114,31 @@ def wrapper(self, context, *args, **kwargs):
                       for i in inlets]
             self.inlets.extend(inlets)
 
-        if self._inlets['auto']:
-            # dont append twice
-            task_ids = set(self._inlets['task_ids']).symmetric_difference(
-                self.upstream_task_ids
-            )
-            inlets = self.xcom_pull(context,
-                                    task_ids=task_ids,
-                                    dag_id=self.dag_id,
-                                    key=PIPELINE_OUTLETS)
-            inlets = [item for sublist in inlets if sublist for item in 
sublist]
-            inlets = [DataSet.map_type(i['typeName'])(data=i['attributes'])
-                      for i in inlets]
-            self.inlets.extend(inlets)
-
-        if len(self._inlets['datasets']) > 0:
-            self.inlets.extend(self._inlets['datasets'])
+        if self._inlets["auto"]:
+            visited_task_ids = set(self._inlets["task_ids"])  # prevent double 
counting of outlets
+            stack = {self.task_id}
+            while stack:
+                task_id = stack.pop()
+                task = self._dag.task_dict[task_id]
+                visited_task_ids.add(task_id)
+                inlets = self.xcom_pull(
 
 Review comment:
   Hi @bolkedebruin, can you elaborate on your objection? Once the code reaches 
a node of the DAG and finds outlets, it stops attempting to find further tasks 
upstream as the upstream tasks  are not placed into the stack. In the case, for 
example, that the task will have multiple (say, for e.g., 2) upstream tasks, 
then xcom_pull will be called twice compared to the previous code. However, 
since xcom_pull fires off a seperate database call (using get_one) for each 
task_id in the task_ids parameter, I don't think that this should have an 
appreciable difference in speed. 
   
   Regarding using something like topological sort, I feel like that would lose 
too much information; in what I regard as the 'expected' behaviour, it requires 
discrimination between upstream or sibling  nodes, as you want to stop when 
outlets from a branch are found. As again, since that will do individual 
database queries, I'm not sure that it will be more efficient.
   
   I could change it so that it doesn't do a database query on the task, but 
thats a relatively small change. 
   
   Thanks for looking over the PR, and tell me if I have misunderstood 
something.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to