klyusba opened a new issue #9721:
URL: https://github.com/apache/airflow/issues/9721
**Apache Airflow version**: 1.10.10
**Environment**: puckel/docker-airflow with celery executor
**What happened**:
PostgresHook inside PythonOperator causes cryptography.fernet.InvalidToken
error.
PostgresOperator causes no error in the same dag.
**What you expected to happen**:
PostgresHook causes no error within PythonOperator as it does within
PostgresOperator
**How to reproduce it**:
<details><summary>dag example</summary>
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
import datetime
default_args = {
"owner": "airflow",
"start_date": datetime.datetime(2020, 3, 4),
}
dag = DAG(
"log_test2",
default_args=default_args,
schedule_interval=None
)
def test():
h = PostgresHook(pg_conn_id="pg_dc_dwhmeta")
res = h.get_records("select 1;")
print(res)
t1 = PostgresOperator(
task_id="t1",
sql="select 1;",
postgres_conn_id="pg_dc_dwhmeta",
dag=dag
)
t2 = PythonOperator(
task_id='t2',
python_callable=test,
dag=dag
)
t1>>t2
</details>
**Anything else we need to know**:
<details><summary>t1.log</summary>
[2020-07-08 13:49:05,134] {{taskinstance.py:669}} INFO - Dependencies all
met for <TaskInstance: log_test2.t1 2020-07-08T13:48:49.898014+00:00 [queued]>
[2020-07-08 13:49:05,333] {{taskinstance.py:669}} INFO - Dependencies all
met for <TaskInstance: log_test2.t1 2020-07-08T13:48:49.898014+00:00 [queued]>
[2020-07-08 13:49:05,333] {{taskinstance.py:879}} INFO -
--------------------------------------------------------------------------------
[2020-07-08 13:49:05,333] {{taskinstance.py:880}} INFO - Starting attempt 1
of 1
[2020-07-08 13:49:05,333] {{taskinstance.py:881}} INFO -
--------------------------------------------------------------------------------
[2020-07-08 13:49:05,471] {{taskinstance.py:900}} INFO - Executing
<Task(PostgresOperator): t1> on 2020-07-08T13:48:49.898014+00:00
[2020-07-08 13:49:05,477] {{standard_task_runner.py:53}} INFO - Started
process 36152 to run task
[2020-07-08 13:49:05,981] {{logging_mixin.py:112}} INFO - Running %s on host
%s <TaskInstance: log_test2.t1 2020-07-08T13:48:49.898014+00:00 [running]>
f7636f8a5dc5
[2020-07-08 13:49:06,434] {{postgres_operator.py:62}} INFO - Executing:
select 1;
[2020-07-08 13:49:06,482] {{logging_mixin.py:112}} INFO - [2020-07-08
13:49:06,482] {{base_hook.py:87}} INFO - Using connection to: id:
pg_dc_dwhmeta. Host: 10.5.84.51, Port: 5432, Schema: dc_dwhmeta_dev, Login:
tech_dwhetl, Password: XXXXXXXX, extra: None
[2020-07-08 13:49:06,493] {{logging_mixin.py:112}} INFO - [2020-07-08
13:49:06,493] {{dbapi_hook.py:174}} INFO - select 1;
[2020-07-08 13:49:06,605] {{taskinstance.py:1065}} INFO - Marking task as
SUCCESS.dag_id=log_test2, task_id=t1, execution_date=20200708T134849,
start_date=20200708T134905, end_date=20200708T134906
[2020-07-08 13:49:14,780] {{logging_mixin.py:112}} INFO - [2020-07-08
13:49:14,780] {{local_task_job.py:103}} INFO - Task exited with return code 0
</details>
<details><summary>t2.log</summary>
[2020-07-08 13:49:17,503] {{taskinstance.py:669}} INFO - Dependencies all
met for <TaskInstance: log_test2.t2 2020-07-08T13:48:49.898014+00:00 [queued]>
[2020-07-08 13:49:17,717] {{taskinstance.py:669}} INFO - Dependencies all
met for <TaskInstance: log_test2.t2 2020-07-08T13:48:49.898014+00:00 [queued]>
[2020-07-08 13:49:17,718] {{taskinstance.py:879}} INFO -
--------------------------------------------------------------------------------
[2020-07-08 13:49:17,718] {{taskinstance.py:880}} INFO - Starting attempt 1
of 1
[2020-07-08 13:49:17,718] {{taskinstance.py:881}} INFO -
--------------------------------------------------------------------------------
[2020-07-08 13:49:17,874] {{taskinstance.py:900}} INFO - Executing
<Task(PythonOperator): t2> on 2020-07-08T13:48:49.898014+00:00
[2020-07-08 13:49:17,880] {{standard_task_runner.py:53}} INFO - Started
process 36181 to run task
[2020-07-08 13:49:18,385] {{logging_mixin.py:112}} INFO - Running %s on host
%s <TaskInstance: log_test2.t2 2020-07-08T13:48:49.898014+00:00 [running]>
f7636f8a5dc5
[2020-07-08 13:49:18,803] {{taskinstance.py:1145}} ERROR -
Traceback (most recent call last):
File
"/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line
983, in _run_raw_task
result = task_copy.execute(context=context)
File
"/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py",
line 113, in execute
return_value = self.execute_callable()
File
"/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py",
line 118, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/log_test/log_test2.py", line 22, in test
res = h.get_records("select 1;")
File "/usr/local/lib/python3.6/site-packages/airflow/hooks/dbapi_hook.py",
line 115, in get_records
with closing(self.get_conn()) as conn:
File
"/usr/local/lib/python3.6/site-packages/airflow/hooks/postgres_hook.py", line
71, in get_conn
conn = self.get_connection(conn_id)
File "/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py",
line 87, in get_connection
log.info("Using connection to: %s", conn.log_info())
File
"/usr/local/lib/python3.6/site-packages/airflow/models/connection.py", line
320, in log_info
"XXXXXXXX" if self.password else None,
File
"/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/attributes.py", line
358, in get
retval = self.descriptor.get(instance, owner)
File
"/usr/local/lib/python3.6/site-packages/airflow/models/connection.py", line
190, in get_password
return fernet.decrypt(bytes(self._password, 'utf-8')).decode()
File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line
171, in decrypt
raise InvalidToken
cryptography.fernet.InvalidToken
[2020-07-08 13:49:18,806] {{taskinstance.py:1202}} INFO - Marking task as
FAILED.dag_id=log_test2, task_id=t2, execution_date=20200708T134849,
start_date=20200708T134917, end_date=20200708T134918
[2020-07-08 13:49:27,111] {{logging_mixin.py:112}} INFO - [2020-07-08
13:49:27,111] {{local_task_job.py:103}} INFO - Task exited with return code 1
</details>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]