ImmortalLotus opened a new issue, #35179:
URL: https://github.com/apache/airflow/issues/35179
### Apache Airflow version
2.7.2
### What happened
airflow throws this error when trying to terminate a task that comes from a
DAG that is created using DAG decorator and then it can't terminate the pod
when using Kubernetes Executor.
It runs the task correctly, however when killing the pod it throws this
error and then does not delete the pod.
exit log:
```
[2023-10-25T13:23:21.856+0000] {local_task_job_runner.py:228} INFO - Task
exited with return code 0
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py",
line 60, in main
args.func(args)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_config.py",
line 49, in command
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line
113, in wrapper
return f(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
line 430, in task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
line 208, in _run_task_by_selected_method
return _run_task_by_local_task_job(args, ti)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
line 270, in _run_task_by_local_task_job
ret = run_job(job=job_runner.job, execute_callable=job_runner._execute)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py",
line 77, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airf
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/job.py",
line 318, in execute_job
ret = execute_callable()
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job_runner.py",
line 192, in _execute
self.handle_task_exit(return_code)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job_runner.py",
line 232, in handle_task_exit
self.task_instance.schedule_downstream_tasks(max_tis_per_query=self.job.max_tis_per_query)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py",
line 77, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
line 2754, in schedule_downstream_tasks
partial_dag = task.dag.partial_subset(
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line
2393, in partial_subset
dag.task_dict = {
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line
2394, in <dictcomp>
t.task_id: _deepcopy_task(t)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line
2391, in _deepcopy_task
return copy.deepcopy(t, memo)
File "/usr/local/lib/python3.8/copy.py", line 153, in deepcopy
y = copier(memo)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py",
line 1214, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 161, in deepcopy
rv = reductor(4)
TypeError: cannot pickle 'module' object
```
### What you think should happen instead
It should terminate correctly and delete the pod.
### How to reproduce
Use the following class(that is still not perfect) to generate a dag like
below. the purpose of this class is to encapsulate some common usage we have
when working some speficic EL jobs:
```
from __future__ import annotations
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from docker.types import Mount
from airflow.operators.dummy_operator import DummyOperator
from kubernetes.client import models as k8s
from airflow.models import Variable
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from typing import TYPE_CHECKING, Any
import httpx
import pendulum
from sqlalchemy import create_engine
from airflow.decorators import dag, task
from airflow.models.baseoperator import BaseOperator
from airflow.operators.email import EmailOperator
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
if TYPE_CHECKING:
from airflow.utils.context import Context
# [START dag_decorator_usage]
class sql_dag():
def __init__(self,nome_servidor_destino, servidor_origem_dict:dict,
nome_banco_destino , nome_banco_origem, schema):
conexao = MsSqlHook.get_connection(nome_servidor_destino)
self.hook_banco_destino=MsSqlHook(mssql_conn_id=nome_servidor_destino)
self.engine_destino =
create_engine(f'mssql+pyodbc://{conexao.login}:{conexao.password}@{conexao.host}/{nome_banco_destino}?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=YES')
for chave, valor in servidor_origem_dict.items():
if(valor=='Postgres'):
self.hook_banco_origem=PostgresHook(postgres_conn_id=chave)
elif(valor=='MSSQL'):
self.hook_banco_origem=MsSqlHook(mssql_conn_id=chave)
else:
raise Exception("banco não suportado")
self.schema = schema
self.sql_template_padrao=f'use [{nome_banco_origem}] select * from '
self.truncate_sql=f"""use [{nome_banco_destino}] truncate table """
def configurar_dag(self,horario:str,nome:str, origem,acao,destino):
tabela_dict= Variable.get(nome, default_var={"erro":["erro"]},
deserialize_json=True)
def truncate_tabelas(**kwargs):
sql3=kwargs['self'].truncate_sql+kwargs['tabela']
kwargs['self'].hook_banco_destino.run(sql3)
def insert_tabelas(**kwargs):
sql4 = kwargs['self'].sql_template_padrao+kwargs['tabela']
df = kwargs['self'].hook_banco_origem.get_pandas_df(sql4)
df.to_sql(kwargs['tabela'],
schema=kwargs['self'].schema,con=kwargs['self'].engine_destino,if_exists='append',
index=False)
@dag(
schedule=horario,
dag_id=nome,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=[origem,acao,destino]
)
def criar():
start = DummyOperator(
task_id='comecaDag'
)
end = DummyOperator(
task_id='endDag'
)
with TaskGroup(group_id=f"insert_tabelas") as insert:
se_estiver_vazio=DummyOperator(
task_id='pra_evitar_erros'
)
for chave,valor in tabela_dict.items():
if(valor=='padraoSemDtInsert'):
task_insert_tabelas =
PythonOperator(task_id=f'insert_{chave}',
python_callable=insert_tabelas,
op_kwargs={"tabela":chave, "self":self}
)
with TaskGroup(group_id=f"truncate_tabelas") as truncate:
se_estiver_vazio2=DummyOperator(
task_id='pra_evitar_erros2'
)
for chave, valor in tabela_dict.items():
task_truncate_tabelas =
PythonOperator(task_id=f'truncate_{chave}',
python_callable=truncate_tabelas,
op_kwargs={"tabela":chave, "self":self}
)
start>>truncate>>insert>>end
dag_teste=criar()
```
code to use the above class, that works when considering a MSSQL destination
and a PGSQL/MSSQL Source:
```
from cagd_libs.BANCOS_SQL import sql_dag
from airflow.models import Variable
tabela_dict={"YourTable":"padraoSemDtInsert"
}
nome_variavel="ID_Of_the_DAG_YOULL_CREATE"
Variable.set(key=nome_variavel, value=tabela_dict, serialize_json=True)
dag_sql=sql_dag(nome_servidor_destino='ID_OF_THE_DESTINATION_DATABASE_IN_AIRFLOW',
servidor_origem_dict={'ID_OF_THE_SOURCE_DATABASE_IN_AIRFLOW':'Postgres'},
nome_banco_destino='destination_database_name',
nome_banco_origem='source_database_name',
schema='database_schema')
teste_dag=
dag_sql.configurar_dag('airflow_schedule',nome_variavel,"tags_that_we_use","tags_that_we_use","tags_that_we_use")
```
### Operating System
Red Hat Openshift, airflow HelmChart
### Versions of Apache Airflow Providers
pip install apache-airflow-providers-odbc \
&& pip install apache-airflow-providers-microsoft-mssql
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else
this error occus on every that that is creating using above class,
regardless of whether it runs correctly or not, the pod throws the log cited
above and then is not deleted
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]