jedcunningham commented on a change in pull request #21261:
URL: https://github.com/apache/airflow/pull/21261#discussion_r797133065
##########
File path: airflow/providers/elasticsearch/log/es_task_handler.py
##########
@@ -187,15 +187,24 @@ def _read(
metadata['end_of_log'] = False if not logs else len(loading_hosts) == 0
cur_ts = pendulum.now()
- # Assume end of log after not receiving new log for 5 min,
- # as executor heartbeat is 1 min and there might be some
- # delay before Elasticsearch makes the log available.
if 'last_log_timestamp' in metadata:
last_log_ts = timezone.parse(metadata['last_log_timestamp'])
- if (
+
+ # if we are not getting any logs at all after more than N seconds
of trying,
+ # assume logs do not exist
+ if int(next_offset) == 0 and cur_ts.diff(last_log_ts).in_seconds()
> 5:
+ metadata['end_of_log'] = True
+ message = (
+ f"*** Log {log_id} not found in elasticsearch. "
+ f"If your task started recently, please wait a moment and
reload this page. "
+ f"Otherwise, the logs for this task instance may have been
removed."
Review comment:
```suggestion
"If your task started recently, please wait a moment and
reload this page. "
"Otherwise, the logs for this task instance may have
been removed."
```
nit
##########
File path: tests/providers/elasticsearch/log/test_es_task_handler.py
##########
@@ -131,6 +141,28 @@ def test_read(self, ti):
assert '1' == metadatas[0]['offset']
assert timezone.parse(metadatas[0]['last_log_timestamp']) > ts
+ def test_read_missing_logs(self, create_task_instance):
+ """
+ When the log actually isn't there to be found, we only want to wait
for 5 seconds.
+ In this case we expect to receive a message of the form 'Log {log_id}
not found in elasticsearch ...'
+ """
+ ti = get_ti(
+ self.DAG_ID,
+ self.TASK_ID,
+ pendulum.instance(self.EXECUTION_DATE).add(days=1), # so logs are
not found
+ create_task_instance=create_task_instance,
+ )
+ ts = pendulum.now().add(seconds=-6)
+ logs, metadatas = self.es_task_handler.read(ti, 1, {'offset': 0,
'last_log_timestamp': str(ts)})
+
+ assert 1 == len(logs)
+ assert re.match(r'.*Log .* not found in elasticsearch.*',
logs[0][0][1]) is not None
Review comment:
```suggestion
assert re.match(r'^\*\*\* Log .* not found in elasticsearch.*',
logs[0][0][1]) is not None
```
A little better?
##########
File path: airflow/providers/elasticsearch/log/es_task_handler.py
##########
@@ -187,15 +187,24 @@ def _read(
metadata['end_of_log'] = False if not logs else len(loading_hosts) == 0
cur_ts = pendulum.now()
- # Assume end of log after not receiving new log for 5 min,
- # as executor heartbeat is 1 min and there might be some
- # delay before Elasticsearch makes the log available.
if 'last_log_timestamp' in metadata:
last_log_ts = timezone.parse(metadata['last_log_timestamp'])
- if (
+
+ # if we are not getting any logs at all after more than N seconds
of trying,
+ # assume logs do not exist
+ if int(next_offset) == 0 and cur_ts.diff(last_log_ts).in_seconds()
> 5:
+ metadata['end_of_log'] = True
+ message = (
+ f"*** Log {log_id} not found in elasticsearch. "
+ f"If your task started recently, please wait a moment and
reload this page. "
+ f"Otherwise, the logs for this task instance may have been
removed."
+ )
+ return [('', message)], metadata
+ elif (
Review comment:
```suggestion
if (
```
nit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]