[ 
https://issues.apache.org/jira/browse/AIRFLOW-7065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17098545#comment-17098545
 ] 

ASF GitHub Bot commented on AIRFLOW-7065:
-----------------------------------------

c-wilson commented on a change in pull request #7740:
URL: https://github.com/apache/airflow/pull/7740#discussion_r419148387



##########
File path: airflow/utils/log/logging_mixin.py
##########
@@ -118,6 +129,53 @@ def isatty(self):
         """
         return False
 
+    def add_stream_target(self, target: IOBase):
+        """
+        Adds a stream target to propagate messages to in addition to the 
provided logger.
+        :param target: File like to write to.
+        :return:
+        """
+        if not hasattr(target, 'write'):
+            raise TypeError('Stream target must be writeable.')
+
+        self._additional_stream_targets.append(target)
+
+
+class _StreamToLogRedirector(AbstractContextManager):
+    """Context manager to redirect console stream to a StreamLogWriter"""
+    stream_to_replace: str
+    _existing_stream_target: List[IOBase]
+
+    def __init__(self, logger: logging.Logger, level: int, 
propagate_to_existing_stream: bool = False):
+        self.propagate_to_existing_stream = propagate_to_existing_stream
+        self._existing_stream_target = []
+        self._replacement_stream = StreamLogWriter(logger, level)
+
+    def __enter__(self):
+        """Saves existing stream target and replaces it will this instance."""
+        existing_stream = getattr(sys, self.stream_to_replace)
+        self._existing_stream_target.append(existing_stream)
+
+        if self.propagate_to_existing_stream:
+            self._replacement_stream.add_stream_target(existing_stream)
+
+        setattr(sys, self.stream_to_replace, self._replacement_stream)
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """Puts back existing stream target"""
+        self._replacement_stream.flush()
+        setattr(sys, self.stream_to_replace, 
self._existing_stream_target.pop())
+
+
+class StdoutToLog(_StreamToLogRedirector):

Review comment:
       It's possible, but I don't know if the result would be cleaner. Maybe 
you're right. This just serves as a compact way to pass the right stream to the 
StreamLogHandler when propagation is desired. This _could_ live in the 
task_command.py as if statements if we want.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Give StreamLogHandler ability to tee to log and console
> -------------------------------------------------------
>
>                 Key: AIRFLOW-7065
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-7065
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: logging
>    Affects Versions: 1.10.9
>            Reporter: Christopher
>            Assignee: Christopher
>            Priority: Minor
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> It would be helpful in some use case (ie log aggregation) to have Airflow 
> task output stream to console as well as go through the Airflow logger. 
> Currently stdout and stderr are captured and sent only to the logger - 
> putting a StreamLogHandler in the logging chain results in an infinite 
> recursion.
> I propose adding a configuration flag [core][task_console_output] that would 
> allow safe display of messages in the console as well as propagation to the 
> Airflow logging chain.
> I will put in a PR for this shortly and see how it goes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to