potiuk edited a comment on issue #12969:
URL: https://github.com/apache/airflow/issues/12969#issuecomment-744310938


   It's quite easy to reproduce for me:
   
   1) I firs applied https://github.com/apache/airflow/pull/13055 to solve 
incompatibility of Airflow with snowflake 
https://github.com/apache/airflow/pull/13056
   
   2) in files/airflow-breeze-config/init.sh I added this:
   
   ```
   AIRFLOW__LOGGING__REMOTE_LOGGING="True"
   AIRFLOW__LOGGING__ENCRYPT_S3_LOGS="False"
   AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID="aws_default"
   
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER="s3://test-amazon-logging/airflowlogs"
   AWS_ACCESS_KEY_ID="<REDACTED>"
   AWS_SECRET_ACCESS_KEY="<REDACED>"
   ```
   3) Started Airflow
   
   ```
   ./breeze start-airflow --backend postgres --load-example-dags 
--load-default-connections --install-airflow-version 2.0.0rc2 \
        --skip-mounting-local-sources --extras "amazon" --python 3.8 
   ```
   
   3) I modified the installed airflow code to see whether the set_context() 
and close()  have been called in S3TaskHandler:
   Modified: 
`/usr/local/lib/python3.8/site-packages/airflow/providers/amazon/aws/log/s3_task_handler.py`
   
   I added also the proposal from you (self.hook) at the end of set_context()
   
   ```
       def set_context(self, ti):                                        
           super().set_context(ti)
           # Local location and remote location is needed to open and
           # upload local log file to S3 remote storage.
           self.log_relative_path = self._render_filename(ti, ti.try_number)
           self.upload_on_close = not ti.raw
           # Clear the file first so that duplicate data is not uploaded
           # when re-using the same path (e.g. with rescheduled sensors)
           with open ("/tmp/set_context.txt", "at") as f:
               f.write("Set context\n")                                         
                          
               f.write(self.log_relative_path + "\n")
               f.write(str(self.upload_on_close) + "\n")
               f.write(self.local_base + "\n")                                  
  
               f.write(self.remote_base + "\n")                                
               f.write(self.log_relative_path + "\n")            
           if self.upload_on_close:                                    
               with open(self.handler.baseFilename, 'w'):                       
 
                   pass                        
           # Ensure the hook is connected now -- at close time we will have
           # already disconnected from the DB
           self.hook
   
   def close(self):
           """Close and upload local log file to remote storage S3."""
           # When application exit, system shuts down all handlers by
           # calling close method. Here we check if logger is already
           # closed to prevent uploading the log to remote storage multiple
           # times when `logging.shutdown` is called.
           with open ("/tmp/closing.txt", "at") as f:
               f.write("Closing\n")
               f.write(self.log_relative_path + "\n")
               f.write(str(self.upload_on_close) + "\n")
               f.write(self.local_base + "\n")
               f.write(self.remote_base + "\n")
               f.write(self.log_relative_path + "\n")
           if self.closed:
               return
           super().close()
   
           if not self.upload_on_close:
               return
           local_loc = os.path.join(self.local_base, self.log_relative_path)
           remote_loc = os.path.join(self.remote_base, self.log_relative_path)
           if os.path.exists(local_loc):
               # read log and remove old logs to get just the latest additions
               with open(local_loc) as logfile:
                   log = logfile.read()
               self.s3_write(log, remote_loc)
   ```
   
   3a) I also copied the template config just to be sure: 
   
   ```
   mkdir ${AIRFLOW_HOME}/config/
   cp 
/usr/local/lib/python3.8/site-packages/airflow/config_templates/airflow_local_settings.py
 ${AIRFLOW_HOME}/config/
   ```
   
   4) I kill and restart scheduler:
   
   The template config is used all right:
   
   ```
   root@6611da4b1a27:/opt/airflow# airflow scheduler
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   [2020-12-14 09:37:40,496] {scheduler_job.py:1241} INFO - Starting the 
scheduler
   [2020-12-14 09:37:40,496] {scheduler_job.py:1246} INFO - Processing each 
file at most -1 times
   [2020-12-14 09:37:40,563] {dag_processing.py:250} INFO - Launched 
DagFileProcessorManager with pid: 842145
   [2020-12-14 09:37:40,563] {scheduler_job.py:1751} INFO - Resetting orphaned 
tasks for active dag runs
   [2020-12-14 09:37:40,568] {settings.py:52} INFO - Configured default 
timezone Timezone('UTC')
   [2020-12-14 09:37:40,569] {settings.py:421} INFO - Loaded 
airflow_local_settings from /root/airflow/config/airflow_local_settings.py .
   ```
   
   
   5) I run `tail -f` on both files:
   
   At this point, e both set context and close were called once (with empty 
relative paths). I think this call is unnecessary, and this is another problem 
but it does not introduce any side effects so I ignore it for now. It shows 
however, that the handlers works.
   
   set_context.txt:
   ```
   Set context
   
   True
   /root/airflow/logs
   s3://test-amazon-logging/airflowlogs
   
   ```
   
   closing.txt: 
   ```
   Closing
   
   True
   /root/airflow/logs
   s3://test-amazon-logging/airflowlogs
   
   ```
   
   6) I manually (via UI) trigger example_bash_operator
   
   7) set_context is called  all right
   ```
   Set context
   example_bash_operator/run_after_loop/2020-12-13T22:51:17.397960+00:00/1.log
   False
   /root/airflow/logs
   s3://test-amazon-logging/airflowlogs
   example_bash_operator/run_after_loop/2020-12-13T22:51:17.397960+00:00/1.log
   ```
   
   But close() is not.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to