reubenvanammers commented on a change in pull request #4156: [AIRFLOW-3314]
Changed auto inlets feature to work as described
URL: https://github.com/apache/incubator-airflow/pull/4156#discussion_r236112185
##########
File path: airflow/lineage/__init__.py
##########
@@ -110,26 +114,31 @@ def wrapper(self, context, *args, **kwargs):
for i in inlets]
self.inlets.extend(inlets)
- if self._inlets['auto']:
- # dont append twice
- task_ids = set(self._inlets['task_ids']).symmetric_difference(
- self.upstream_task_ids
- )
- inlets = self.xcom_pull(context,
- task_ids=task_ids,
- dag_id=self.dag_id,
- key=PIPELINE_OUTLETS)
- inlets = [item for sublist in inlets if sublist for item in
sublist]
- inlets = [DataSet.map_type(i['typeName'])(data=i['attributes'])
- for i in inlets]
- self.inlets.extend(inlets)
-
- if len(self._inlets['datasets']) > 0:
- self.inlets.extend(self._inlets['datasets'])
+ if self._inlets["auto"]:
+ visited_task_ids = set(self._inlets["task_ids"]) # prevent double
counting of outlets
+ stack = {self.task_id}
+ while stack:
+ task_id = stack.pop()
+ task = self._dag.task_dict[task_id]
+ visited_task_ids.add(task_id)
+ inlets = self.xcom_pull(
Review comment:
Hi @bolkedebruin, can you elaborate on your objection? Once the code reaches
a node of the DAG and finds outlets, it stops attempting to find further tasks
upstream as the upstream tasks are not placed into the stack. In the case, for
example, that the task will have multiple (say, for e.g., 2) upstream tasks,
then xcom_pull will be called twice compared to the previous code. However,
since xcom_pull fires off a seperate database call (using get_one) for each
task_id in the task_ids parameter, I don't think that this should have an
appreciable difference in speed.
Regarding using something like topological sort, I feel like that would lose
too much information; in what I regard as the 'expected' behaviour, it requires
discrimination between upstream or sibling nodes, as you want to stop when
outlets from a branch are found. As again, since that will do individual
database queries, I'm not sure that it will be more efficient.
I could change it so that it doesn't do a database query on the task, but
thats a relatively small change.
Thanks for looking over the PR, and tell me if I have misunderstood
something.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services