kacpermuda commented on code in PR #41034:
URL: https://github.com/apache/airflow/pull/41034#discussion_r1692580612


##########
airflow/lineage/hook.py:
##########
@@ -106,7 +130,13 @@ def add_input_dataset(
             scheme=scheme, uri=uri, dataset_kwargs=dataset_kwargs, 
dataset_extra=dataset_extra
         )
         if dataset:
-            self.inputs.append((dataset, context))
+            key = self._generate_key(dataset, context)
+            if key not in self._inputs:
+                self._inputs[key] = (dataset, context)
+                self._input_counts[key] = 1
+            else:
+                self._input_counts[key] += 1
+                self._inputs[key] = (dataset, context)

Review Comment:
   If we go with `defaultdict(int)` in `__init__`:
   
   ```
   self._input_counts: dict[str, int] = defaultdict(int)
   self._output_counts: dict[str, int] = defaultdict(int)
   ```
   then we could probably skip the `if` here
   ```suggestion
               key = self._generate_key(dataset, context)
               self._inputs[key] = (dataset, context)
               self._input_counts[key] += 1
   ```
   
   even if we don't switch to defaultdict, this can be probably be done outside 
of if: `self._inputs[key] = (dataset, context)`. 
   I was thinking it should only be assigned again if not already in inputs, 
and if it's already there we could simply increase the count, but I'm not sure 
if we are certain that the LineageContext content hasn't changed, and that's 
why we should re-assign it, correct?



##########
airflow/lineage/hook.py:
##########
@@ -121,17 +151,32 @@ def add_output_dataset(
             scheme=scheme, uri=uri, dataset_kwargs=dataset_kwargs, 
dataset_extra=dataset_extra
         )
         if dataset:
-            self.outputs.append((dataset, context))
+            key = self._generate_key(dataset, context)
+            if key not in self._outputs:
+                self._outputs[key] = (dataset, context)
+                self._output_counts[key] = 1
+            else:
+                self._output_counts[key] += 1
+                self._outputs[key] = (dataset, context)
 
     @property
     def collected_datasets(self) -> HookLineage:
         """Get the collected hook lineage information."""
-        return HookLineage(self.inputs, self.outputs)
+        return HookLineage(
+            [
+                {"dataset": dataset, "count": self._input_counts[key], 
"context": context}

Review Comment:
   From the consumer of lineage perspective, I’m thinking a more specific 
solution might work better than a generic dictionary, as it can have any keys 
and doesn’t provide clear type hints. How about using a dedicated class like 
CollectedDataset or CollectedLineage instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to