potiuk edited a comment on issue #12969: URL: https://github.com/apache/airflow/issues/12969#issuecomment-744310938
It's quite east to reproduce for me: 1) I firs applied https://github.com/apache/airflow/pull/13055 to solve incompatibility of Airflow with snowflake https://github.com/apache/airflow/pull/13056 2) in files/airflow-breeze-config/init.sh I added this: ``` AIRFLOW__LOGGING__REMOTE_LOGGING="True" AIRFLOW__LOGGING__ENCRYPT_S3_LOGS="False" AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID="aws_default" AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER="s3://test-amazon-logging/airflowlogs" AWS_ACCESS_KEY_ID="<REDACTED>" AWS_SECRET_ACCESS_KEY="<REDACED>" ``` 3) Started Airflow ``` ./breeze start-airflow --backend postgres --load-example-dags --load-default-connections --install-airflow-version 2.0.0rc2 \ --skip-mounting-local-sources --extras "amazon" --python 3.8 ``` 3) I modified the installed airflow code to see whether the set_context() and close() have been called in S3TaskHandler: Modified: `/usr/local/lib/python3.8/site-packages/airflow/providers/amazon/aws/log/s3_task_handler.py` I added also the proposal from you (self.hook) at the end of set_context() ``` def set_context(self, ti): super().set_context(ti) # Local location and remote location is needed to open and # upload local log file to S3 remote storage. self.log_relative_path = self._render_filename(ti, ti.try_number) self.upload_on_close = not ti.raw # Clear the file first so that duplicate data is not uploaded # when re-using the same path (e.g. with rescheduled sensors) with open ("/tmp/set_context.txt", "at") as f: f.write("Set context\n") f.write(self.log_relative_path + "\n") f.write(str(self.upload_on_close) + "\n") f.write(self.local_base + "\n") f.write(self.remote_base + "\n") f.write(self.log_relative_path + "\n") if self.upload_on_close: with open(self.handler.baseFilename, 'w'): pass # Ensure the hook is connected now -- at close time we will have # already disconnected from the DB self.hook def close(self): """Close and upload local log file to remote storage S3.""" # When application exit, system shuts down all handlers by # calling close method. Here we check if logger is already # closed to prevent uploading the log to remote storage multiple # times when `logging.shutdown` is called. with open ("/tmp/closing.txt", "at") as f: f.write("Closing\n") f.write(self.log_relative_path + "\n") f.write(str(self.upload_on_close) + "\n") f.write(self.local_base + "\n") f.write(self.remote_base + "\n") f.write(self.log_relative_path + "\n") if self.closed: return super().close() if not self.upload_on_close: return local_loc = os.path.join(self.local_base, self.log_relative_path) remote_loc = os.path.join(self.remote_base, self.log_relative_path) if os.path.exists(local_loc): # read log and remove old logs to get just the latest additions with open(local_loc) as logfile: log = logfile.read() self.s3_write(log, remote_loc) ``` 3a) I also copied the template config just to be sure: ``` mkdir ${AIRFLOW_HOME}/config/ cp /usr/local/lib/python3.8/site-packages/airflow/config_templates/airflow_local_settings.py ${AIRFLOW_HOME}/config/ ``` 4) I kill and restart scheduler: The template config is used all right: ``` root@6611da4b1a27:/opt/airflow# airflow scheduler ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ [2020-12-14 09:37:40,496] {scheduler_job.py:1241} INFO - Starting the scheduler [2020-12-14 09:37:40,496] {scheduler_job.py:1246} INFO - Processing each file at most -1 times [2020-12-14 09:37:40,563] {dag_processing.py:250} INFO - Launched DagFileProcessorManager with pid: 842145 [2020-12-14 09:37:40,563] {scheduler_job.py:1751} INFO - Resetting orphaned tasks for active dag runs [2020-12-14 09:37:40,568] {settings.py:52} INFO - Configured default timezone Timezone('UTC') [2020-12-14 09:37:40,569] {settings.py:421} INFO - Loaded airflow_local_settings from /root/airflow/config/airflow_local_settings.py . ``` 5) I run `tail -f` on both files: At this point, e both set context and close were called once (with empty relative paths). I think this call is unnecessary, and this is another problem but it does not introduce any side effects so I ignore it for now. It shows however, that the handlers works. set_context.txt: ``` Set context True /root/airflow/logs s3://test-amazon-logging/airflowlogs ``` closing.txt: ``` Closing True /root/airflow/logs s3://test-amazon-logging/airflowlogs ``` 6) I manually (via UI) trigger example_bash_operator 7) set_context is called all right ``` Set context example_bash_operator/run_after_loop/2020-12-13T22:51:17.397960+00:00/1.log False /root/airflow/logs s3://test-amazon-logging/airflowlogs example_bash_operator/run_after_loop/2020-12-13T22:51:17.397960+00:00/1.log ``` But close() is not. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
