potiuk commented on code in PR #28818:
URL: https://github.com/apache/airflow/pull/28818#discussion_r1065660545
##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int
| None, metadata: di
metadata.pop("max_offset", None)
metadata.pop("offset", None)
metadata.pop("log_pos", None)
- while "end_of_log" not in metadata or (
- not metadata["end_of_log"] and ti.state not in [State.RUNNING,
State.DEFERRED]
- ):
+ while True:
logs, metadata = self.read_log_chunks(ti, current_try_number,
metadata)
for host, log in logs[0]:
yield "\n".join([host or "", log]) + "\n"
+ if "end_of_log" not in metadata or (
+ not metadata["end_of_log"] and ti.state not in
[State.RUNNING, State.DEFERRED]
+ ):
+ time.sleep(0.5)
Review Comment:
Another (more concrete) question (Actually just asking the questions above
in a written form sparked more thinking about it).
What happens if last few logs are coming when the thread is sleeping and the
task completes?
I understand that this log reader is used in a deamon Thread in
base_task_runner and I think that task runner will simply wait for the forked
(or exec'ed) process and will exit immediately when it exits. Which means that
it will also stop the task reader immediately (I believe daemon threads are
stopped immediately when they are last remaining thread). This means that any
logs that wil be produced in that 0.5 s will not be processed by the
task_reader.
The question is - do we care (I think we previously had subtle race
condition here - because similar thing could have happen from time to time if
the logs were written **just** before the task completed). But by adding the
sleep here we are greatly increasing the probability of it happening. I think
there will be tasks that will consistently have this problem.
So my question is - do we care that we lost the last logs? I am not 100%
sure what will be the effect in case of various handlers.
I think if we care, then we should stop using daemon thread and do a join on
the reader thread just before we leave - with another magic timeout of course -
but likely related to the one used in sleep).
##########
airflow/utils/log/log_reader.py:
##########
@@ -77,12 +78,16 @@ def read_log_stream(self, ti: TaskInstance, try_number: int
| None, metadata: di
metadata.pop("max_offset", None)
metadata.pop("offset", None)
metadata.pop("log_pos", None)
- while "end_of_log" not in metadata or (
- not metadata["end_of_log"] and ti.state not in [State.RUNNING,
State.DEFERRED]
- ):
+ while True:
logs, metadata = self.read_log_chunks(ti, current_try_number,
metadata)
for host, log in logs[0]:
yield "\n".join([host or "", log]) + "\n"
+ if "end_of_log" not in metadata or (
+ not metadata["end_of_log"] and ti.state not in
[State.RUNNING, State.DEFERRED]
+ ):
+ time.sleep(0.5)
Review Comment:
Another (more concrete) question (Actually just asking the questions above
in a written form sparked more thinking about it).
What happens if last few logs are coming when the thread is sleeping and the
task completes?
I understand that this log reader is used in a deamon Thread in
base_task_runner and I think that task runner will simply wait for the forked
(or exec'ed) process and will exit immediately when it exits. Which means that
it will also stop the task reader immediately (I believe daemon threads are
stopped immediately when they are last remaining thread). This means that any
logs that wil be produced in that 0.5 s will not be processed by the
task_reader.
The question is - do we care (I think we previously had subtle race
condition here - because similar thing could have happened from time to time if
the logs were written **just** before the task completed). But by adding the
sleep here we are greatly increasing the probability of it happening. I think
there will be tasks that will consistently have this problem.
So my question is - do we care that we lost the last logs? I am not 100%
sure what will be the effect in case of various handlers.
I think if we care, then we should stop using daemon thread and do a join on
the reader thread just before we leave - with another magic timeout of course -
but likely related to the one used in sleep).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]