joselfrias opened a new issue, #41661:
URL: https://github.com/apache/airflow/issues/41661
### Apache Airflow version
2.9.3
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
I've veen trying to extend the Kubernetes Pod Operator in order to assign it
a successfull callback when the pod executes with success. The main idea here
is to create a custom Operator that will execute the assigned callback
everytime this Custom Operator is used.
However, when I run it inside the DAG, it appears from the logs that the
callback is not executed.
The following presents the code of that `CustomKubernetesPodOperator` that
I'm trying to build.
```
from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator
from airflow.utils.decorators import apply_defaults
class ExtendedKubernetesPodOperator(KubernetesPodOperator):
@apply_defaults
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def execute(self, context):
# Execute the original KubernetesPodOperator logic
result = super().execute(context)
# Call the success callback if the pod was successful
#if self.success_callback and self.pod.status.phase == 'Succeeded':
default_success_callback(context)
return result
# Define a default success callback function
def default_success_callback(context):
print(f"Pod succeeded!")
class CustomKubernetesPodOperator(ExtendedKubernetesPodOperator):
@apply_defaults
def __init__(self, *args, **kwargs):
if 'on_success_callback' not in kwargs:
kwargs['on_success_callback'] = default_success_callback
super().__init__(*args, **kwargs)
```
And the DAG where this custom operator is used.
```
from datetime import datetime
from airflow import DAG
#from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import
KubernetesPodOperator
from custom_kubernetes_operator import CustomKubernetesPodOperator
import os
# Define the default_args
default_args = {
'owner': 'HelloWorldOwner',
'start_date': datetime(2024, 8, 22),
'retries': 1,
}
# Define the DAG
dag = DAG(
'hello_world_kubernetes_pod',
default_args=default_args,
description='Hello World DAG',
schedule_interval=None,
catchup=False,
)
# Define the KubernetesPodOperator
hello_world = CustomKubernetesPodOperator(
dag=dag,
image='ubuntu:latest',
cmds=["sh", "-c"],
arguments=[
"""
echo Hello World
"""
],
name='hello-world-pod',
task_id="hello_world",
get_logs=True
)
# Set the task in the DAG
hello_world
```
### What you think should happen instead?
I tried to create for example a custom bash operator, similar to the Custom
Operator defined above, but in this case the callback is executed with success
and the message is printed in the logs. Here is the code of that custom bash
operator that works. Is there something I am missing in the definition of the
custom Kubernetes Pod Operator or the behaviour should be equal?
The custom bash operator:
```
from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator
from airflow.utils.decorators import apply_defaults
from airflow.operators.bash import BashOperator
class ExtendedBashOperator(BashOperator):
@apply_defaults
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def execute(self, context):
result = super().execute(context)
# Call the success callback if the pod was successful
default_success_callback(context)
return result
# Define a default success callback function
def default_success_callback(context):
print(f"Pod succeeded!")
class CustomBashOperator(ExtendedBashOperator):
@apply_defaults
def __init__(self, *args, **kwargs):
if 'on_success_callback' not in kwargs:
kwargs['on_success_callback'] = default_success_callback
super().__init__(*args, **kwargs)
```
The DAG where the bash operator is imported.
```
from datetime import datetime
from airflow import DAG
from custom_bash_operator import CustomBashOperator
import os
# Define the default_args
default_args = {
'owner': 'Hello World Owner',
'start_date': datetime(2024, 8, 22),
'retries': 1,
}
# Define the DAG
dag = DAG(
'hello_world_bash',
default_args=default_args,
description='Hello World DAG',
schedule_interval=None,
catchup=False,
)
# Define the KubernetesPodOperator
hello_world_bash = CustomBashOperator(
dag=dag,
bash_command="echo HELLO",
name='hello-world-bash',
task_id="hello_world_bash"
)
# Set the task in the DAG
hello_world_bash
```
### How to reproduce
1. Create a custom Operator that inherits from the KubernetesPodOperator
(similar to above).
2. Create a DAG that will use this Custom Operator, with no success callback
defined in the task definition.
3. Run the DAG.
4. Verify that the callback is not executed.
### Operating System
Ubuntu 22.04.2 LTS
### Versions of Apache Airflow Providers
_No response_
### Deployment
Virtualenv installation
### Deployment details
Native installation with pip.
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]