rhwang10 commented on a change in pull request #4303: [AIRFLOW-3370]
Elasticsearch log task handler additional features
URL: https://github.com/apache/airflow/pull/4303#discussion_r253881116
##########
File path: airflow/utils/log/es_task_handler.py
##########
@@ -68,12 +118,115 @@ def __init__(self, base_log_folder, filename_template,
self.mark_end_on_close = True
self.end_of_log_mark = end_of_log_mark
+ def set_context(self, ti):
+ if self.write_stdout:
+ self.writer = ParentStdout()
+ sys.stdout = self.writer
+
+ self.taskInstance = self._process_task_instance(ti)
+
+ self.handler = logging.StreamHandler(stream=sys.stdout)
+
+ if self.json_format:
+ self.handler.setFormatter(JsonFormatter(self.record_labels,
+ self.taskInstance))
+ elif not self.json_format:
+ self.handler.setFormatter(self.formatter)
+ self.handler.setLevel(self.level)
+
+ elif not self.write_stdout:
+ super(ElasticsearchTaskHandler, self).set_context(ti)
+ self.mark_end_on_close = not ti.raw
+
+ def emit(self, record):
+ if self.write_stdout:
+ self.formatter.format(record)
+ if self.handler is not None:
+ self.handler.emit(record)
+ elif not self.write_stdout:
+ super(ElasticsearchTaskHandler).emit(record)
+
+ def flush(self):
+ if self.handler is not None:
+ self.handler.flush()
+
+ def _process_task_instance(self, ti):
+ """
+ Process task instance information to create a log_id
+ key for Elasticsearch
+ """
+ ti_info = {'dag_id': str(ti.dag_id),
+ 'task_id': str(ti.task_id),
+ 'execution_date': str(ti.execution_date),
+ 'try_number': str(ti.try_number)}
+ return ti_info
+
+ def read(self, task_instance, try_number=None, metadata=None):
+ """
+ Read logs of a given task instance from elasticsearch.
+ :param task_instance: task instance object
+ :param try_number: task instance try_number to read logs from.
+ If None,it will start at 1.
+ """
+ if self.write_stdout:
+ if try_number is None:
+ next_try = task_instance.next_try_number
+ try_numbers = list(range(1, next_try))
+ elif try_number < 1:
+ logs = [
+ 'Error fetching the logs. \
+ Try number {} is invalid.'.format(try_number),
+ ]
+ return logs
+ else:
+ try_numbers = [try_number]
+
+ logs = [''] * len(try_numbers)
+ metadatas = [{}] * len(try_numbers)
+ for i, try_number in enumerate(try_numbers):
+ log, metadata = self._read(task_instance,
+ try_number,
+ metadata)
+
+ # If there's a log present, then we don't want to keep
+ # checking. Set end_of_log to True, set the
+ # mark_end_on_close to False and return the log and
+ # metadata. This will prevent the recursion from happening
+ # in the ti_log.html script and will therefore prevent
+ # constantly checking ES for updates, since we've
+ # fetched what we're looking for
+ if log:
+ logs[i] += log
+ metadata['end_of_log'] = True
+ self.mark_end_on_close = False
+ metadatas[i] = metadata
+ elif not log:
+ metadata['end_of_log'] = False
+ metadatas[i] = metadata
+
+ return logs, metadatas
+ elif not self.write_stdout:
+ return super(ElasticsearchTaskHandler, self) \
+ .read(task_instance, try_number, metadata)
+
def _render_log_id(self, ti, try_number):
+ # Using Jinja2 templating
if self.log_id_jinja_template:
jinja_context = ti.get_template_context()
jinja_context['try_number'] = try_number
return self.log_id_jinja_template.render(**jinja_context)
+ # Make log_id ES Query friendly if using standard out option
Review comment:
> Really appreciate all the detailed docs in the PR and comments 👍 Pardon me
for being unfamiliar with EFK stack, if we are not in a k8s env and multiple
tasks were ran under the same main airflow worker process, can we easily
separate those logs from different tis? If in a k8s env and we have 1 ti per
pod, will this include the logs from the main process into the ti log?
@KevinYang21 Thanks for the very detailed review! loved reading through your
comments, and looking into addressing all of them. each TI that runs on a
worker process is tagged with a `log_id` field, but the logs from the main
process are not. When a log entry is read from elasticsearch, it expects that
`log_id` to be part of the http request. So the logs from the main process
won't be a part of the subsequent response, since the `log_id` will be
empty/null. In the EFK stack, we tag every TI log with a log_id in the fluentD
pipeline before sending it off to elasticsearch, but a user can do it any way
they'd like (Logstash, Beats) as long as that `log_id` field is present.
(https://github.com/astronomer/helm.astronomer.io/blob/master/charts/fluentd/templates/fluentd-configmap.yaml#L135-L143).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services