jason810496 commented on code in PR #53821:
URL: https://github.com/apache/airflow/pull/53821#discussion_r2314221763


##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -661,13 +610,66 @@ def _get_result(self, hit: dict[Any, Any], 
parent_class=None) -> Hit:
         callback: type[Hit] | Callable[..., Any] = getattr(doc_class, 
"from_es", doc_class)
         return callback(hit)
 
-    def _parse_raw_log(self, log: str) -> list[dict[str, Any]]:
+
+@attrs.define(kw_only=True)
+class ElasticsearchRemoteLogIO(LoggingMixin):  # noqa: D101
+    write_stdout: bool = False
+    delete_local_copy: bool = False
+    host: str = "http://localhost:9200";
+    host_field: str = "host"
+    target_index: str = "airflow-logs"
+    offset_field: str = "offset"
+    write_to_es: bool = False
+    base_log_folder: Path = attrs.field(converter=Path)
+
+    processors = ()
+
+    def __attrs_post_init__(self):
+        es_kwargs = get_es_kwargs_from_config()
+        self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs)
+        self.index_patterns_callable = conf.get("elasticsearch", 
"index_patterns_callable", fallback="")
+        self.PAGE = 0
+        self.MAX_LINE_PER_PAGE = 1000
+        self.index_patterns: str = conf.get("elasticsearch", "index_patterns")
+        self._doc_type_map: dict[Any, Any] = {}
+        self._doc_type: list[Any] = []
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Write the log to ElasticSearch."""
+        path = Path(path)
+
+        if path.is_absolute():
+            local_loc = path
+        else:
+            local_loc = self.base_log_folder.joinpath(path)
+
+        if local_loc.is_file() and self.write_stdout:
+            # Intentionally construct the log_id and offset field
+            log_lines = self._parse_raw_log(local_loc.read_text(), ti)
+            for line in log_lines:
+                sys.stdout.write(json.dumps(line) + "\n")
+                sys.stdout.flush()
+
+        if local_loc.is_file() and self.write_to_es:
+            log_lines = self._parse_raw_log(local_loc.read_text(), ti)
+            success = self._write_to_es(log_lines)
+            if success and self.delete_local_copy:
+                shutil.rmtree(os.path.dirname(local_loc))
+
+    def _parse_raw_log(self, log: str, ti: RuntimeTI) -> list[dict[str, Any]]:
         logs = log.split("\n")
         parsed_logs = []
+        offset = 1
         for line in logs:
             # Make sure line is not empty
             if line.strip():
-                parsed_logs.append(json.loads(line))
+                # construct log_id which is 
{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
+                # also construct the offset field (default is 'offset')
+                log_dict = json.loads(line)
+                log_id = 
f"{ti.dag_id}-{ti.task_id}-{ti.run_id}-{ti.map_index}-{ti.try_number}"

Review Comment:
   This comment is resolved by `_render_log_id` helper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to