jason810496 commented on code in PR #53821:
URL: https://github.com/apache/airflow/pull/53821#discussion_r2240263369


##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -331,9 +334,10 @@ def _read(
 
         # end_of_log_mark may contain characters like '\n' which is needed to
         # have the log uploaded but will not be stored in elasticsearch.
+        print(f"self.end_of_log_mark = {self.end_of_log_mark}")

Review Comment:
   ```suggestion
   ```



##########
providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py:
##########
@@ -885,25 +884,6 @@ def 
test_filename_template_for_backward_compatibility(self):
             filename_template=None,
         )
 

Review Comment:
   Unit test for `ElasticsearchRemoteLogIO` is required, would you mind to add 
the test? Thanks!
   
   The following the unit test for `S3RemoteLogIO` can be a good reference
   
   
https://github.com/apache/airflow/blob/4b3b62ddbfb5c61e739453ba9e11912cf3a39cc0/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py#L49



##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -331,9 +334,10 @@ def _read(
 
         # end_of_log_mark may contain characters like '\n' which is needed to
         # have the log uploaded but will not be stored in elasticsearch.
+        print(f"self.end_of_log_mark = {self.end_of_log_mark}")
         metadata["end_of_log"] = False
         if logs_by_host:
-            if any(x[-1].message == self.end_of_log_mark for x in 
logs_by_host.values()):
+            if any(x[-1].event == self.end_of_log_mark for x in 
logs_by_host.values()):
                 metadata["end_of_log"] = True

Review Comment:
   May I ask whether changing `.message` to `.event` is breaking change and why 
do we need to change it?



##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -530,18 +537,6 @@ def close(self) -> None:
             self.handler.close()
             sys.stdout = sys.__stdout__
 
-        if self.write_to_es and not self.write_stdout:
-            full_path = self.handler.baseFilename  # type: ignore[union-attr]
-            log_relative_path = 
pathlib.Path(full_path).relative_to(self.local_base).as_posix()
-            local_loc = os.path.join(self.local_base, log_relative_path)
-            if os.path.exists(local_loc):
-                # read log and remove old logs to get just the latest additions
-                log = pathlib.Path(local_loc).read_text()
-                log_lines = self._parse_raw_log(log)
-                success = self._write_to_es(log_lines)
-                if success and self.delete_local_copy:
-                    shutil.rmtree(os.path.dirname(local_loc))
-
         super().close()

Review Comment:
   I think we need to keep the change, as `set_context` will not be used in 
Airflow 3 but still can be used in Airflow `2.x`. Removing cleanup of temporal 
logs will cause different behavior when user install latest ES provider in 
Airflow `2.x`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to