jason810496 commented on code in PR #53821: URL: https://github.com/apache/airflow/pull/53821#discussion_r2314221763
########## providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py: ########## @@ -661,13 +610,66 @@ def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit: callback: type[Hit] | Callable[..., Any] = getattr(doc_class, "from_es", doc_class) return callback(hit) - def _parse_raw_log(self, log: str) -> list[dict[str, Any]]: + +@attrs.define(kw_only=True) +class ElasticsearchRemoteLogIO(LoggingMixin): # noqa: D101 + write_stdout: bool = False + delete_local_copy: bool = False + host: str = "http://localhost:9200" + host_field: str = "host" + target_index: str = "airflow-logs" + offset_field: str = "offset" + write_to_es: bool = False + base_log_folder: Path = attrs.field(converter=Path) + + processors = () + + def __attrs_post_init__(self): + es_kwargs = get_es_kwargs_from_config() + self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs) + self.index_patterns_callable = conf.get("elasticsearch", "index_patterns_callable", fallback="") + self.PAGE = 0 + self.MAX_LINE_PER_PAGE = 1000 + self.index_patterns: str = conf.get("elasticsearch", "index_patterns") + self._doc_type_map: dict[Any, Any] = {} + self._doc_type: list[Any] = [] + + def upload(self, path: os.PathLike | str, ti: RuntimeTI): + """Write the log to ElasticSearch.""" + path = Path(path) + + if path.is_absolute(): + local_loc = path + else: + local_loc = self.base_log_folder.joinpath(path) + + if local_loc.is_file() and self.write_stdout: + # Intentionally construct the log_id and offset field + log_lines = self._parse_raw_log(local_loc.read_text(), ti) + for line in log_lines: + sys.stdout.write(json.dumps(line) + "\n") + sys.stdout.flush() + + if local_loc.is_file() and self.write_to_es: + log_lines = self._parse_raw_log(local_loc.read_text(), ti) + success = self._write_to_es(log_lines) + if success and self.delete_local_copy: + shutil.rmtree(os.path.dirname(local_loc)) + + def _parse_raw_log(self, log: str, ti: RuntimeTI) -> list[dict[str, Any]]: logs = log.split("\n") parsed_logs = [] + offset = 1 for line in logs: # Make sure line is not empty if line.strip(): - parsed_logs.append(json.loads(line)) + # construct log_id which is {dag_id}-{task_id}-{run_id}-{map_index}-{try_number} + # also construct the offset field (default is 'offset') + log_dict = json.loads(line) + log_id = f"{ti.dag_id}-{ti.task_id}-{ti.run_id}-{ti.map_index}-{ti.try_number}" Review Comment: This comment is resolved by `_render_log_id` helper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org