joselfrias opened a new issue, #41661:
URL: https://github.com/apache/airflow/issues/41661

   ### Apache Airflow version
   
   2.9.3
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I've veen trying to extend the Kubernetes Pod Operator in order to assign it 
a successfull callback when the pod executes with success.  The main idea here 
is to create a custom Operator that will execute the assigned callback 
everytime this Custom Operator is used. 
   However, when I run it inside the DAG, it appears from the logs that the 
callback is not executed. 
   
   The following presents the code of that `CustomKubernetesPodOperator` that 
I'm trying to build.
   ```
   from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
   from airflow.utils.decorators import apply_defaults
   
   class ExtendedKubernetesPodOperator(KubernetesPodOperator):
       @apply_defaults
       def __init__(self, *args, **kwargs):
           super().__init__(*args, **kwargs)
   
       def execute(self, context):
           # Execute the original KubernetesPodOperator logic
           result = super().execute(context)
   
           # Call the success callback if the pod was successful
           #if self.success_callback and self.pod.status.phase == 'Succeeded':
           default_success_callback(context)
   
           return result
   
   # Define a default success callback function
   def default_success_callback(context):
       print(f"Pod succeeded!")
   
   class CustomKubernetesPodOperator(ExtendedKubernetesPodOperator):
       @apply_defaults
       def __init__(self, *args, **kwargs):
           if 'on_success_callback' not in kwargs:
               kwargs['on_success_callback'] = default_success_callback
           super().__init__(*args, **kwargs)
   ```
   
   And the DAG where this custom operator is used.
   ```
   from datetime import datetime
   from airflow import DAG
   #from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
   from custom_kubernetes_operator import CustomKubernetesPodOperator
   import os
   
   # Define the default_args
   default_args = {
       'owner': 'HelloWorldOwner',
       'start_date': datetime(2024, 8, 22),
       'retries': 1,
   }
   
   # Define the DAG
   dag = DAG(
       'hello_world_kubernetes_pod',
       default_args=default_args,
       description='Hello World DAG',
       schedule_interval=None,
       catchup=False,
   )
   
   # Define the KubernetesPodOperator
   hello_world = CustomKubernetesPodOperator(
       dag=dag,
       image='ubuntu:latest',
       cmds=["sh", "-c"],
       arguments=[
           """
           echo Hello World
           """
       ],
       name='hello-world-pod',
       task_id="hello_world",
       get_logs=True
   )
   
   # Set the task in the DAG
   hello_world
   ```
   
   ### What you think should happen instead?
   
   I tried to create for example a custom bash operator, similar to the Custom 
Operator defined above, but in this case the callback is executed with success 
and the message is printed in the logs. Here is the code of that custom bash 
operator that works. Is there something I am missing in the definition of the 
custom Kubernetes Pod Operator or the behaviour should be equal?
   
   The custom bash operator:
   ```
   from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
   from airflow.utils.decorators import apply_defaults
   from airflow.operators.bash import BashOperator
   
   class ExtendedBashOperator(BashOperator):
       @apply_defaults
       def __init__(self, *args, **kwargs):
           super().__init__(*args, **kwargs)
   
       def execute(self, context):
           result = super().execute(context)
   
           # Call the success callback if the pod was successful
           default_success_callback(context)
   
           return result
   
   # Define a default success callback function
   def default_success_callback(context):
       print(f"Pod succeeded!")
   
   class CustomBashOperator(ExtendedBashOperator):
       @apply_defaults
       def __init__(self, *args, **kwargs):
           if 'on_success_callback' not in kwargs:
               kwargs['on_success_callback'] = default_success_callback
           super().__init__(*args, **kwargs)
   ```
   The DAG where the bash operator is imported.
   
   ```
   from datetime import datetime
   from airflow import DAG
   from custom_bash_operator import CustomBashOperator
   import os
   
   
   # Define the default_args
   default_args = {
       'owner': 'Hello World Owner',
       'start_date': datetime(2024, 8, 22),
       'retries': 1,
   }
   
   
   # Define the DAG
   dag = DAG(
       'hello_world_bash',
       default_args=default_args,
       description='Hello World DAG',
       schedule_interval=None,
       catchup=False,
   )
   
   # Define the KubernetesPodOperator
   hello_world_bash = CustomBashOperator(
       dag=dag,
       bash_command="echo HELLO",
       name='hello-world-bash',
       task_id="hello_world_bash"
   )
   
   # Set the task in the DAG
   hello_world_bash
   ```
   
   ### How to reproduce
   
   1. Create a custom Operator that inherits from the KubernetesPodOperator 
(similar to above).
   2. Create a DAG that will use this Custom Operator, with no success callback 
defined in the task definition.
   3. Run the DAG.
   4. Verify that the callback is not executed. 
   
   ### Operating System
   
   Ubuntu 22.04.2 LTS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   Native installation with pip.
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to