Owen-CH-Leung commented on code in PR #53821:
URL: https://github.com/apache/airflow/pull/53821#discussion_r2327392468


##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -661,13 +602,79 @@ def _get_result(self, hit: dict[Any, Any], 
parent_class=None) -> Hit:
         callback: type[Hit] | Callable[..., Any] = getattr(doc_class, 
"from_es", doc_class)
         return callback(hit)
 
-    def _parse_raw_log(self, log: str) -> list[dict[str, Any]]:
+    def _get_log_message(self, hit: Hit) -> str:
+        """Get log message from hit, supporting both Airflow 2.x and 3.x 
formats."""
+        if hasattr(hit, "event"):
+            return hit.event
+        if hasattr(hit, "message"):
+            return hit.message
+        return ""
+
+
+@attrs.define(kw_only=True)
+class ElasticsearchRemoteLogIO(LoggingMixin):  # noqa: D101
+    json_format: bool = False
+    write_stdout: bool = False
+    delete_local_copy: bool = False
+    host: str = "http://localhost:9200";
+    host_field: str = "host"
+    target_index: str = "airflow-logs"
+    offset_field: str = "offset"
+    write_to_es: bool = False
+    base_log_folder: Path = attrs.field(converter=Path)
+
+    processors = ()
+
+    def __attrs_post_init__(self):
+        es_kwargs = get_es_kwargs_from_config()
+        self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs)
+        self.index_patterns_callable = conf.get("elasticsearch", 
"index_patterns_callable", fallback="")
+        self.PAGE = 0
+        self.MAX_LINE_PER_PAGE = 1000
+        self.index_patterns: str = conf.get("elasticsearch", "index_patterns")
+        self._doc_type_map: dict[Any, Any] = {}
+        self._doc_type: list[Any] = []
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Write the log to ElasticSearch."""
+        path = Path(path)
+
+        if path.is_absolute():
+            local_loc = path
+        else:
+            local_loc = self.base_log_folder.joinpath(path)
+
+        # Convert the runtimeTI to the real TaskInstance that via fetching 
from DB
+        ti = TaskInstance.get_task_instance(
+            ti.dag_id, ti.run_id, ti.task_id, ti.map_index if ti.map_index is 
not None else -1
+        )  # type: ignore[assignment]

Review Comment:
   I have a test here that performs write & read against a es instance in 
testcontainer & it works.
   
   
https://github.com/apache/airflow/pull/53821/files#diff-93cf9dc05c8d386583395b6b5ae366675a3770523d8c2495de1b9c5f53d016b1R1019
   
   @jason810496 Why do we need to retrieve `log_id_template` before 
constructing the `RuntimeTaskInstance` ? My understanding is that the 
`RuntimeTaskInstance` is a protocol that is not specific to elasticsearch. I 
believe it has nothing to do with the `log_id_template` in db. 
   
   Currently my implementation is to retrieve the `log_id_template` after 
creating the `RuntimeTaskInstance`, and this is used to construct the log_id 
key to be inserted into the elasticsearch. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to