klyusba opened a new issue #9721:
URL: https://github.com/apache/airflow/issues/9721


   **Apache Airflow version**: 1.10.10
   
   **Environment**: puckel/docker-airflow with celery executor
   
   **What happened**:
   
   PostgresHook inside PythonOperator causes cryptography.fernet.InvalidToken 
error.
   PostgresOperator causes no error in the same dag.
   
   **What you expected to happen**:
   
   PostgresHook causes no error within PythonOperator as it does within 
PostgresOperator 
   
   **How to reproduce it**:
   
   <details><summary>dag example</summary>
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.hooks.postgres_hook import PostgresHook
   from airflow.operators.postgres_operator import PostgresOperator
   import datetime
   
   
   default_args = {
       "owner": "airflow",
       "start_date": datetime.datetime(2020, 3, 4),
   }
   
   dag = DAG(
       "log_test2", 
       default_args=default_args,
       schedule_interval=None
   )
   
   
   def test():
       h = PostgresHook(pg_conn_id="pg_dc_dwhmeta")
       res = h.get_records("select 1;")
       print(res)
   
   
   t1 = PostgresOperator(
       task_id="t1", 
       sql="select 1;", 
       postgres_conn_id="pg_dc_dwhmeta", 
       dag=dag
   )
   
   t2 = PythonOperator(
       task_id='t2',
       python_callable=test,
       dag=dag
   )
   
   t1>>t2
   
   </details>
   
   
   **Anything else we need to know**:
   
   <details><summary>t1.log</summary> 
   [2020-07-08 13:49:05,134] {{taskinstance.py:669}} INFO - Dependencies all 
met for <TaskInstance: log_test2.t1 2020-07-08T13:48:49.898014+00:00 [queued]>
   [2020-07-08 13:49:05,333] {{taskinstance.py:669}} INFO - Dependencies all 
met for <TaskInstance: log_test2.t1 2020-07-08T13:48:49.898014+00:00 [queued]>
   [2020-07-08 13:49:05,333] {{taskinstance.py:879}} INFO - 
   
--------------------------------------------------------------------------------
   [2020-07-08 13:49:05,333] {{taskinstance.py:880}} INFO - Starting attempt 1 
of 1
   [2020-07-08 13:49:05,333] {{taskinstance.py:881}} INFO - 
   
--------------------------------------------------------------------------------
   [2020-07-08 13:49:05,471] {{taskinstance.py:900}} INFO - Executing 
<Task(PostgresOperator): t1> on 2020-07-08T13:48:49.898014+00:00
   [2020-07-08 13:49:05,477] {{standard_task_runner.py:53}} INFO - Started 
process 36152 to run task
   [2020-07-08 13:49:05,981] {{logging_mixin.py:112}} INFO - Running %s on host 
%s <TaskInstance: log_test2.t1 2020-07-08T13:48:49.898014+00:00 [running]> 
f7636f8a5dc5
   [2020-07-08 13:49:06,434] {{postgres_operator.py:62}} INFO - Executing: 
select 1;
   [2020-07-08 13:49:06,482] {{logging_mixin.py:112}} INFO - [2020-07-08 
13:49:06,482] {{base_hook.py:87}} INFO - Using connection to: id: 
pg_dc_dwhmeta. Host: 10.5.84.51, Port: 5432, Schema: dc_dwhmeta_dev, Login: 
tech_dwhetl, Password: XXXXXXXX, extra: None
   [2020-07-08 13:49:06,493] {{logging_mixin.py:112}} INFO - [2020-07-08 
13:49:06,493] {{dbapi_hook.py:174}} INFO - select 1;
   [2020-07-08 13:49:06,605] {{taskinstance.py:1065}} INFO - Marking task as 
SUCCESS.dag_id=log_test2, task_id=t1, execution_date=20200708T134849, 
start_date=20200708T134905, end_date=20200708T134906
   [2020-07-08 13:49:14,780] {{logging_mixin.py:112}} INFO - [2020-07-08 
13:49:14,780] {{local_task_job.py:103}} INFO - Task exited with return code 0
   </details>
   
   <details><summary>t2.log</summary> 
   [2020-07-08 13:49:17,503] {{taskinstance.py:669}} INFO - Dependencies all 
met for <TaskInstance: log_test2.t2 2020-07-08T13:48:49.898014+00:00 [queued]>
   [2020-07-08 13:49:17,717] {{taskinstance.py:669}} INFO - Dependencies all 
met for <TaskInstance: log_test2.t2 2020-07-08T13:48:49.898014+00:00 [queued]>
   [2020-07-08 13:49:17,718] {{taskinstance.py:879}} INFO - 
   
--------------------------------------------------------------------------------
   [2020-07-08 13:49:17,718] {{taskinstance.py:880}} INFO - Starting attempt 1 
of 1
   [2020-07-08 13:49:17,718] {{taskinstance.py:881}} INFO - 
   
--------------------------------------------------------------------------------
   [2020-07-08 13:49:17,874] {{taskinstance.py:900}} INFO - Executing 
<Task(PythonOperator): t2> on 2020-07-08T13:48:49.898014+00:00
   [2020-07-08 13:49:17,880] {{standard_task_runner.py:53}} INFO - Started 
process 36181 to run task
   [2020-07-08 13:49:18,385] {{logging_mixin.py:112}} INFO - Running %s on host 
%s <TaskInstance: log_test2.t2 2020-07-08T13:48:49.898014+00:00 [running]> 
f7636f8a5dc5
   [2020-07-08 13:49:18,803] {{taskinstance.py:1145}} ERROR - 
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 
983, in _run_raw_task
       result = task_copy.execute(context=context)
     File 
"/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", 
line 113, in execute
       return_value = self.execute_callable()
     File 
"/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", 
line 118, in execute_callable
       return self.python_callable(*self.op_args, **self.op_kwargs)
     File "/usr/local/airflow/dags/log_test/log_test2.py", line 22, in test
       res = h.get_records("select 1;")
     File "/usr/local/lib/python3.6/site-packages/airflow/hooks/dbapi_hook.py", 
line 115, in get_records
       with closing(self.get_conn()) as conn:
     File 
"/usr/local/lib/python3.6/site-packages/airflow/hooks/postgres_hook.py", line 
71, in get_conn
       conn = self.get_connection(conn_id)
     File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", 
line 87, in get_connection
       log.info("Using connection to: %s", conn.log_info())
     File 
"/usr/local/lib/python3.6/site-packages/airflow/models/connection.py", line 
320, in log_info
       "XXXXXXXX" if self.password else None,
     File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/attributes.py", line 
358, in get
       retval = self.descriptor.get(instance, owner)
     File 
"/usr/local/lib/python3.6/site-packages/airflow/models/connection.py", line 
190, in get_password
       return fernet.decrypt(bytes(self._password, 'utf-8')).decode()
     File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 
171, in decrypt
       raise InvalidToken
   cryptography.fernet.InvalidToken
   [2020-07-08 13:49:18,806] {{taskinstance.py:1202}} INFO - Marking task as 
FAILED.dag_id=log_test2, task_id=t2, execution_date=20200708T134849, 
start_date=20200708T134917, end_date=20200708T134918
   [2020-07-08 13:49:27,111] {{logging_mixin.py:112}} INFO - [2020-07-08 
13:49:27,111] {{local_task_job.py:103}} INFO - Task exited with return code 1
   </details>
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to