SuccessMoses commented on issue #43569:
URL: https://github.com/apache/airflow/issues/43569#issuecomment-2495488848

   @rawwar I created a PR to try to fix this issue, but it is not complete yet. 
I wanted to hear your thoughts.
   
   I managed to make task fail using error_cb in `Consumer`.
   
   ```
    ▶ Log message source details
   [2024-11-23, 13:24:36 UTC] {local_task_job_runner.py:121} ▼ Pre task 
execution logs
   [2024-11-23, 13:24:36 UTC] {taskinstance.py:2403} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
kafka_issue.consume_task manual__2024-11-23T13:24:25.243056+00:00 [queued]>
   [2024-11-23, 13:24:36 UTC] {taskinstance.py:2403} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: kafka_issue.consume_task 
manual__2024-11-23T13:24:25.243056+00:00 [queued]>
   [2024-11-23, 13:24:36 UTC] {taskinstance.py:2654} INFO - Starting attempt 1 
of 2
   [2024-11-23, 13:24:36 UTC] {taskinstance.py:2677} INFO - Executing 
<Task(ConsumeFromTopicOperator): consume_task> on 2024-11-23 
13:24:25.243056+00:00
   [2024-11-23, 13:24:36 UTC] {standard_task_runner.py:131} INFO - Started 
process 15454 to run task
   [2024-11-23, 13:24:36 UTC] {standard_task_runner.py:160} INFO - Running: 
['airflow', 'tasks', 'run', 'kafka_issue', 'consume_task', 
'manual__2024-11-23T13:24:25.243056+00:00', '--raw', '--subdir', 
'DAGS_FOLDER/kafka_min.py', '--cfg-path', '/tmp/tmpmivb_e_5']
   [2024-11-23, 13:24:36 UTC] {standard_task_runner.py:161} INFO - Subtask 
consume_task
   [2024-11-23, 13:24:36 UTC] {task_command.py:446} INFO - Running 
<TaskInstance: kafka_issue.consume_task 
manual__2024-11-23T13:24:25.243056+00:00 [running]> on host 2c7e65d643ac
   [2024-11-23, 13:24:36 UTC] {taskinstance.py:2910} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='kafka_issue' 
AIRFLOW_CTX_TASK_ID='consume_task' 
AIRFLOW_CTX_LOGICAL_DATE='2024-11-23T13:24:25.243056+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2024-11-23T13:24:25.243056+00:00'
   [2024-11-23, 13:24:36 UTC] {logging_mixin.py:191} INFO - Task instance is in 
running state
   [2024-11-23, 13:24:36 UTC] {logging_mixin.py:191} INFO -  Previous state of 
the Task instance: queued
   [2024-11-23, 13:24:36 UTC] {logging_mixin.py:191} INFO - Current task 
name:consume_task state:running start_date:2024-11-23 13:24:36.673356+00:00
   [2024-11-23, 13:24:36 UTC] {logging_mixin.py:191} INFO - Dag 
name:kafka_issue and current dag run status:running
   [2024-11-23, 13:24:36 UTC] {taskinstance.py:723} ▲▲▲ Log group end
   [2024-11-23, 13:24:36 UTC] {base.py:66} INFO - Retrieving connection 
'kafka_connection'
   [2024-11-23, 13:25:07 UTC] {logging_mixin.py:191} INFO - Exception received: 
 KafkaError{code=_TRANSPORT,val=-195,str="hello.com:9092/bootstrap: Connection 
setup timed out in state CONNECT (after 30403ms in state CONNECT)"}
   [2024-11-23, 13:25:36 UTC] {taskinstance.py:3097} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/taskinstance.py", line 759, in 
_execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
     File "/opt/airflow/airflow/models/taskinstance.py", line 725, in 
_execute_callable
       return ExecutionCallableRunner(
     File "/opt/airflow/airflow/utils/operator_helpers.py", line 268, in run
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/models/baseoperator.py", line 375, in wrapper
       return func(self, *args, **kwargs)
     File 
"/opt/airflow/providers/src/airflow/providers/apache/kafka/operators/consume.py",
 line 162, in execute
       msgs = consumer.consume(num_messages=batch_size, 
timeout=self.poll_timeout)
     File 
"/opt/airflow/providers/src/airflow/providers/apache/kafka/hooks/consume.py", 
line 31, in error_callback
       print("Exception received: ", err)
     File 
"/opt/airflow/providers/src/airflow/providers/apache/kafka/hooks/consume.py", 
line 32, in error_callback
       raise KafkaAuthenticationError(f"Authentication failed: {err}")
   airflow.providers.apache.kafka.hooks.consume.KafkaAuthenticationError: 
Authentication failed: 
KafkaError{code=_TRANSPORT,val=-195,str="hello.com:9092/bootstrap: Connection 
setup timed out in state CONNECT (after 30403ms in state CONNECT)"}
   [2024-11-23, 13:25:36 UTC] {logging_mixin.py:191} INFO - Task instance in 
failure state
   [2024-11-23, 13:25:36 UTC] {logging_mixin.py:191} INFO - Task 
start:2024-11-23 13:24:36.673356+00:00 end:2024-11-23 13:25:36.809691+00:00 
duration:60.136335
   [2024-11-23, 13:25:36 UTC] {logging_mixin.py:191} INFO - 
Task:<Task(ConsumeFromTopicOperator): consume_task> dag:<DAG: kafka_issue> 
dagrun:<DagRun kafka_issue @ 2024-11-23 13:24:25.243056+00:00: 
manual__2024-11-23T13:24:25.243056+00:00, state:running, queued_at: 2024-11-23 
13:24:25.252615+00:00. externally triggered: True>
   [2024-11-23, 13:25:36 UTC] {logging_mixin.py:191} INFO - Failure caused by 
Authentication failed: 
KafkaError{code=_TRANSPORT,val=-195,str="hello.com:9092/bootstrap: Connection 
setup timed out in state CONNECT (after 30403ms in state CONNECT)"}
   [2024-11-23, 13:25:36 UTC] {taskinstance.py:1139} INFO - Marking task as 
UP_FOR_RETRY. dag_id=kafka_issue, task_id=consume_task, 
run_id=manual__2024-11-23T13:24:25.243056+00:00, logical_date=20241123T132425, 
start_date=20241123T132436, end_date=20241123T132536
   [2024-11-23, 13:25:36 UTC] {taskinstance.py:346} ▼ Post task execution logs
   [2024-11-23, 13:25:36 UTC] {standard_task_runner.py:178} ERROR - Failed to 
execute task_id=consume_task pid=15454
   Traceback (most recent call last):
     File "/opt/airflow/airflow/task/standard_task_runner.py", line 171, in 
_start_by_fork
       ret = args.func(args, dag=self.dag)
     File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/utils/cli.py", line 112, in wrapper
       return f(*args, **kwargs)
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 462, in 
task_run
       task_return_code = _run_task_by_selected_method(args, _dag, ti)
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 257, in 
_run_task_by_selected_method
       return _run_raw_task(args, ti)
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 325, in 
_run_raw_task
       return ti._run_raw_task(
     File "/opt/airflow/airflow/utils/session.py", line 97, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/airflow/airflow/models/taskinstance.py", line 2790, in 
_run_raw_task
       return _run_raw_task(
     File "/opt/airflow/airflow/models/taskinstance.py", line 279, in 
_run_raw_task
       TaskInstance._execute_task_with_callbacks(
     File "/opt/airflow/airflow/models/taskinstance.py", line 2937, in 
_execute_task_with_callbacks
       result = self._execute_task(context, task_orig)
     File "/opt/airflow/airflow/models/taskinstance.py", line 2961, in 
_execute_task
       return _execute_task(self, context, task_orig)
     File "/opt/airflow/airflow/models/taskinstance.py", line 759, in 
_execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
     File "/opt/airflow/airflow/models/taskinstance.py", line 725, in 
_execute_callable
       return ExecutionCallableRunner(
     File "/opt/airflow/airflow/utils/operator_helpers.py", line 268, in run
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/models/baseoperator.py", line 375, in wrapper
       return func(self, *args, **kwargs)
     File 
"/opt/airflow/providers/src/airflow/providers/apache/kafka/operators/consume.py",
 line 162, in execute
       msgs = consumer.consume(num_messages=batch_size, 
timeout=self.poll_timeout)
     File 
"/opt/airflow/providers/src/airflow/providers/apache/kafka/hooks/consume.py", 
line 31, in error_callback
       print("Exception received: ", err)
     File 
"/opt/airflow/providers/src/airflow/providers/apache/kafka/hooks/consume.py", 
line 32, in error_callback
       raise KafkaAuthenticationError(f"Authentication failed: {err}")
   airflow.providers.apache.kafka.hooks.consume.KafkaAuthenticationError: 
Authentication failed: 
KafkaError{code=_TRANSPORT,val=-195,str="hello.com:9092/bootstrap: Connection 
setup timed out in state CONNECT (after 30403ms in state CONNECT)"}
   [2024-11-23, 13:25:36 UTC] {local_task_job_runner.py:263} INFO - Task exited 
with return code 1
   [2024-11-23, 13:25:36 UTC] {local_task_job_runner.py:242} ▲▲▲ Log group end
   ```
   
   This is the connection I used:
   
   ```
   {
     "bootstrap.servers": "hello.com:9092",
     "group.id": "ea"
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to