ashb commented on a change in pull request #21495:
URL: https://github.com/apache/airflow/pull/21495#discussion_r805963797



##########
File path: airflow/models/mappedoperator.py
##########
@@ -196,6 +196,9 @@ class MappedOperator(AbstractOperator):
     upstream_task_ids: Set[str] = attr.ib(factory=set, init=False)
     downstream_task_ids: Set[str] = attr.ib(factory=set, init=False)
 
+    inlets: list = attr.ib(factory=list)
+    outlets: list = attr.ib(factory=list)

Review comment:
       Properties looking at where? `parital_kwargs`?

##########
File path: airflow/config_templates/config.yml
##########
@@ -598,7 +598,9 @@
       version_added: 2.0.0
       type: string
       example: ~
-      default: "{{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ 
try_number }}}}.log"
+      default: "dag_id={{{{ ti.dag_id }}}}/run_id={{{{ ti.run_id 
}}}}/task_id={{{{ ti.task_id }}}}/\
+               {{%% if ti.map_index != -1 %%}}map_index={{{{ ti.map_index 
}}}}/{{%% endif %%}}\

Review comment:
       `map_index >= 0` in this case, but yeah good idea.

##########
File path: airflow/models/taskinstance.py
##########
@@ -1836,7 +1838,7 @@ def get_template_context(
             params.update(task.params)
         if conf.getboolean('core', 'dag_run_conf_overrides_params'):
             self.overwrite_params_with_dag_run_conf(params=params, 
dag_run=dag_run)
-        validated_params = task.params = params.validate()
+        validated_params = params.validate()

Review comment:
       I forget the error, but setting it here was giving some kind of error (I 
think from it being set _twice_ on the same in-memory object, once in the 
supervisor, and once in the actual runner.)
   
   So I took the approach that `get_*` should never have any sideffects!

##########
File path: airflow/decorators/base.py
##########
@@ -309,6 +309,8 @@ def map(self, **kwargs) -> XComArg:
             partial_kwargs.get("retry_delay", DEFAULT_RETRY_DELAY),
         )
         partial_kwargs.setdefault("executor_config", {})
+        partial_kwargs.setdefault("inlets", [])
+        partial_kwargs.setdefault("outlets", [])

Review comment:
       No, cos of this in BaseOperator init:
   
   ```        # Lineage
           self.inlets: List = []
           self.outlets: List = []
   ```

##########
File path: airflow/sensors/smart_sensor.py
##########
@@ -341,21 +340,21 @@ def _load_sensor_works(self, session=None):
                 .filter(SI.state == State.SENSING)
                 .filter(SI.shardcode < self.shard_max, SI.shardcode >= 
self.shard_min)
             )
-            tis = query.all()
+            sis = query.all()
 
-        self.log.info("Performance query %s tis, time: %.3f", len(tis), 
timer.duration)
+        self.log.info("Performance query %s sis, time: %.3f", len(sis), 
timer.duration)
 
         # Query without checking dagrun state might keep some failed dag_run 
tasks alive.
         # Join with DagRun table will be very slow based on the number of 
sensor tasks we
         # need to handle. We query all smart tasks in this operator
         # and expect scheduler correct the states in 
_change_state_for_tis_without_dagrun()
 
         sensor_works = []
-        for ti in tis:
+        for si in sis:
             try:
-                sensor_works.append(SensorWork(ti))
+                sensor_works.append(SensorWork(si))
             except Exception:
-                self.log.exception("Exception at creating sensor work for ti 
%s", ti.key)
+                self.log.exception("Exception at creating sensor work for ti 
%s", si.ti_key)

Review comment:
       It does, yes.

##########
File path: docs/apache-airflow/logging-monitoring/logging-tasks.rst
##########
@@ -38,7 +38,12 @@ directory.
 .. note::
     For more information on setting the configuration, see 
:doc:`/howto/set-config`
 
-The following convention is followed while naming logs: 
``{dag_id}/{task_id}/{logical_date}/{try_number}.log``
+The default pattern is followed while naming log files form tasks:
+
+- For normal tasks: 
``dag_id{dag_id}/run_id={run_id}/task_id={task_id}/attempt={try_number}.log``.
+- For dynamically mapped tasks: 
``dag_id{dag_id}/run_id={run_id}/task_id={task_id}/map_index={map_index}/attempt={try_number}.log``.

Review comment:
       🤦🏻 

##########
File path: airflow/decorators/base.py
##########
@@ -309,6 +309,8 @@ def map(self, **kwargs) -> XComArg:
             partial_kwargs.get("retry_delay", DEFAULT_RETRY_DELAY),
         )
         partial_kwargs.setdefault("executor_config", {})
+        partial_kwargs.setdefault("inlets", [])
+        partial_kwargs.setdefault("outlets", [])

Review comment:
       Or not. I was wrong and these aren't needed at all actually.

##########
File path: airflow/sensors/smart_sensor.py
##########
@@ -341,21 +340,21 @@ def _load_sensor_works(self, session=None):
                 .filter(SI.state == State.SENSING)
                 .filter(SI.shardcode < self.shard_max, SI.shardcode >= 
self.shard_min)
             )
-            tis = query.all()
+            sis = query.all()
 
-        self.log.info("Performance query %s tis, time: %.3f", len(tis), 
timer.duration)
+        self.log.info("Performance query %s sis, time: %.3f", len(sis), 
timer.duration)
 
         # Query without checking dagrun state might keep some failed dag_run 
tasks alive.
         # Join with DagRun table will be very slow based on the number of 
sensor tasks we
         # need to handle. We query all smart tasks in this operator
         # and expect scheduler correct the states in 
_change_state_for_tis_without_dagrun()
 
         sensor_works = []
-        for ti in tis:
+        for si in sis:
             try:
-                sensor_works.append(SensorWork(ti))
+                sensor_works.append(SensorWork(si))
             except Exception:
-                self.log.exception("Exception at creating sensor work for ti 
%s", ti.key)
+                self.log.exception("Exception at creating sensor work for ti 
%s", si.ti_key)

Review comment:
       Er, no. Reverted this change. (I was thinking wrong class)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to