Owen-CH-Leung commented on code in PR #53821: URL: https://github.com/apache/airflow/pull/53821#discussion_r2327491621
########## providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py: ########## @@ -411,46 +448,16 @@ def _format_msg(self, hit: Hit): ) # Just a safe-guard to preserve backwards-compatibility - return hit.message - - def _es_read(self, log_id: str, offset: int | str, ti: TaskInstance) -> ElasticSearchResponse | None: - """ - Return the logs matching log_id in Elasticsearch and next offset or ''. - - :param log_id: the log_id of the log to read. - :param offset: the offset start to read log from. - :param ti: the task instance object - - :meta private: - """ - query: dict[Any, Any] = { - "bool": { - "filter": [{"range": {self.offset_field: {"gt": int(offset)}}}], - "must": [{"match_phrase": {"log_id": log_id}}], - } - } - - index_patterns = self._get_index_patterns(ti) - try: - max_log_line = self.client.count(index=index_patterns, query=query)["count"] - except NotFoundError as e: - self.log.exception("The target index pattern %s does not exist", index_patterns) - raise e - - if max_log_line != 0: - try: - res = self.client.search( - index=index_patterns, - query=query, - sort=[self.offset_field], - size=self.MAX_LINE_PER_PAGE, - from_=self.MAX_LINE_PER_PAGE * self.PAGE, - ) - return ElasticSearchResponse(self, res) - except Exception as err: - self.log.exception("Could not read log with log_id: %s. Exception: %s", log_id, err) - - return None + # In Airflow 2.x, the log record JSON has a "message" key, e.g.: + # { + # "message": "Dag name:dataset_consumes_1 queued_at:2025-08-12 15:05:57.703493+00:00", + # "offset": 1755011166339518208, + # "log_id": "dataset_consumes_1-consuming_1-manual__2025-08-12T15:05:57.691303+00:00--1-1" + # } + # + # In Airflow 3.x, the "message" field is renamed to "event". + # We check the correct attribute depending on the Airflow major version. Review Comment: Fixed. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org