rhwang10 commented on a change in pull request #4303: [AIRFLOW-3370]
Elasticsearch log task handler additional features
URL: https://github.com/apache/airflow/pull/4303#discussion_r258987877
##########
File path: airflow/utils/log/es_task_handler.py
##########
@@ -68,12 +118,115 @@ def __init__(self, base_log_folder, filename_template,
self.mark_end_on_close = True
self.end_of_log_mark = end_of_log_mark
+ def set_context(self, ti):
+ if self.write_stdout:
+ self.writer = ParentStdout()
+ sys.stdout = self.writer
+
+ self.taskInstance = self._process_task_instance(ti)
+
+ self.handler = logging.StreamHandler(stream=sys.stdout)
+
+ if self.json_format:
+ self.handler.setFormatter(JsonFormatter(self.record_labels,
+ self.taskInstance))
+ elif not self.json_format:
+ self.handler.setFormatter(self.formatter)
+ self.handler.setLevel(self.level)
+
+ elif not self.write_stdout:
+ super(ElasticsearchTaskHandler, self).set_context(ti)
+ self.mark_end_on_close = not ti.raw
+
+ def emit(self, record):
+ if self.write_stdout:
+ self.formatter.format(record)
+ if self.handler is not None:
+ self.handler.emit(record)
+ elif not self.write_stdout:
+ super(ElasticsearchTaskHandler).emit(record)
+
+ def flush(self):
+ if self.handler is not None:
+ self.handler.flush()
+
+ def _process_task_instance(self, ti):
+ """
+ Process task instance information to create a log_id
+ key for Elasticsearch
+ """
+ ti_info = {'dag_id': str(ti.dag_id),
+ 'task_id': str(ti.task_id),
+ 'execution_date': str(ti.execution_date),
+ 'try_number': str(ti.try_number)}
+ return ti_info
+
+ def read(self, task_instance, try_number=None, metadata=None):
+ """
+ Read logs of a given task instance from elasticsearch.
+ :param task_instance: task instance object
+ :param try_number: task instance try_number to read logs from.
+ If None,it will start at 1.
+ """
+ if self.write_stdout:
+ if try_number is None:
+ next_try = task_instance.next_try_number
+ try_numbers = list(range(1, next_try))
+ elif try_number < 1:
+ logs = [
+ 'Error fetching the logs. \
+ Try number {} is invalid.'.format(try_number),
+ ]
+ return logs
+ else:
+ try_numbers = [try_number]
+
+ logs = [''] * len(try_numbers)
+ metadatas = [{}] * len(try_numbers)
+ for i, try_number in enumerate(try_numbers):
+ log, metadata = self._read(task_instance,
+ try_number,
+ metadata)
+
+ # If there's a log present, then we don't want to keep
+ # checking. Set end_of_log to True, set the
+ # mark_end_on_close to False and return the log and
+ # metadata. This will prevent the recursion from happening
+ # in the ti_log.html script and will therefore prevent
+ # constantly checking ES for updates, since we've
+ # fetched what we're looking for
+ if log:
+ logs[i] += log
+ metadata['end_of_log'] = True
+ self.mark_end_on_close = False
+ metadatas[i] = metadata
+ elif not log:
+ metadata['end_of_log'] = False
+ metadatas[i] = metadata
+
+ return logs, metadatas
+ elif not self.write_stdout:
+ return super(ElasticsearchTaskHandler, self) \
+ .read(task_instance, try_number, metadata)
+
def _render_log_id(self, ti, try_number):
+ # Using Jinja2 templating
if self.log_id_jinja_template:
jinja_context = ti.get_template_context()
jinja_context['try_number'] = try_number
return self.log_id_jinja_template.render(**jinja_context)
+ # Make log_id ES Query friendly if using standard out option
Review comment:
@schnie Yep- these string replacements remove reserve characters that were
breaking the ES query and returning non-relevant log matches
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services