dstandish commented on a change in pull request #21261:
URL: https://github.com/apache/airflow/pull/21261#discussion_r798225279



##########
File path: airflow/providers/elasticsearch/log/es_task_handler.py
##########
@@ -187,15 +187,29 @@ def _read(
         metadata['end_of_log'] = False if not logs else len(loading_hosts) == 0
 
         cur_ts = pendulum.now()
-        # Assume end of log after not receiving new log for 5 min,
-        # as executor heartbeat is 1 min and there might be some
-        # delay before Elasticsearch makes the log available.
         if 'last_log_timestamp' in metadata:
             last_log_ts = timezone.parse(metadata['last_log_timestamp'])
+
+            # if we are not getting any logs at all after more than N seconds 
of trying,
+            # assume logs do not exist
+            if int(next_offset) == 0 and cur_ts.diff(last_log_ts).in_seconds() 
> 5:
+                metadata['end_of_log'] = True
+                return [
+                    (
+                        '',
+                        (
+                            f"*** Log {log_id} not found in elasticsearch. "

Review comment:
       odd... if that were true i'd expect the wordlist to be all lowercase.... 
but ... let's find out!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to