hussein-awala commented on code in PR #32646:
URL: https://github.com/apache/airflow/pull/32646#discussion_r1272826215


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -182,14 +183,20 @@ def set_context(self, ti: TaskInstance) -> None | 
SetContextPropagate:
         functionality is only used in unit testing.
 
         :param ti: task instance object
+        :param identifier: if set, adds suffix to log file. For use when 
shipping exceptional messages
+            to task logs from context other than task or trigger run
         """
-        local_loc = self._init_file(ti)
+        local_loc = self._init_file(ti, identifier=identifier)
         self.handler = NonCachingFileHandler(local_loc, encoding="utf-8")
         if self.formatter:
             self.handler.setFormatter(self.formatter)
         self.handler.setLevel(self.level)
         return SetContextPropagate.MAINTAIN_PROPAGATE if 
self.maintain_propagate else None
 
+    @cached_property
+    def supports_task_log_ship(self):

Review Comment:
   ```suggestion
       def supports_task_log_ship(self) -> bool:
   ```



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -218,9 +225,17 @@ def close(self):
         if self.handler:
             self.handler.close()
 
-    def _render_filename(self, ti: TaskInstance, try_number: int) -> str:
+    def _render_filename(self, ti_or_ti_key: TaskInstance | TaskInstanceKey, 
try_number: int) -> str:
         """Returns the worker log filename."""
+        from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
+
         with create_session() as session:
+            if isinstance(ti_or_ti_key, TaskInstanceKey):
+                ti = TaskInstance.get_from_key(ti_or_ti_key, session)
+                if not ti:
+                    raise ValueError("TaskInstance not found")

Review Comment:
   It is better to raise an Airflow exception to inform the user that this 
exception is raised by Airflow and not by a 3rd party library.



##########
airflow/providers/microsoft/azure/log/wasb_task_handler.py:
##########
@@ -92,7 +92,7 @@ def hook(self):
             )
             return None
 
-    def set_context(self, ti) -> None:
+    def set_context(self, ti):

Review Comment:
   You need to add `identifier` to all the task handlers, no? Do I miss 
something? :thinking: 



##########
airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -321,7 +321,7 @@ def emit(self, record):
             setattr(record, self.offset_field, int(time() * (10**9)))
             self.handler.emit(record)
 
-    def set_context(self, ti: TaskInstance) -> None:
+    def set_context(self, ti):

Review Comment:
   why do you need to remove the type hints?



##########
airflow/models/taskinstance.py:
##########
@@ -878,6 +878,19 @@ def refresh_from_db(self, session: Session = NEW_SESSION, 
lock_for_update: bool
         else:
             self.state = None
 
+    @classmethod
+    def get_from_key(cls, ti_key, session) -> TaskInstance | None:

Review Comment:
   IMHO `get_by_key` is more suitable
   ```suggestion
       def get_by_key(cls, ti_key, session) -> TaskInstance | None:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to