SuccessMoses commented on issue #43569:
URL: https://github.com/apache/airflow/issues/43569#issuecomment-2495488848
@rawwar I created a PR to try to fix this issue, but it is not complete yet.
I wanted to hear your thoughts.
I managed to make task fail using error_cb in `Consumer`.
```
▶ Log message source details
[2024-11-23, 13:24:36 UTC] {local_task_job_runner.py:121} ▼ Pre task
execution logs
[2024-11-23, 13:24:36 UTC] {taskinstance.py:2403} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance:
kafka_issue.consume_task manual__2024-11-23T13:24:25.243056+00:00 [queued]>
[2024-11-23, 13:24:36 UTC] {taskinstance.py:2403} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance: kafka_issue.consume_task
manual__2024-11-23T13:24:25.243056+00:00 [queued]>
[2024-11-23, 13:24:36 UTC] {taskinstance.py:2654} INFO - Starting attempt 1
of 2
[2024-11-23, 13:24:36 UTC] {taskinstance.py:2677} INFO - Executing
<Task(ConsumeFromTopicOperator): consume_task> on 2024-11-23
13:24:25.243056+00:00
[2024-11-23, 13:24:36 UTC] {standard_task_runner.py:131} INFO - Started
process 15454 to run task
[2024-11-23, 13:24:36 UTC] {standard_task_runner.py:160} INFO - Running:
['airflow', 'tasks', 'run', 'kafka_issue', 'consume_task',
'manual__2024-11-23T13:24:25.243056+00:00', '--raw', '--subdir',
'DAGS_FOLDER/kafka_min.py', '--cfg-path', '/tmp/tmpmivb_e_5']
[2024-11-23, 13:24:36 UTC] {standard_task_runner.py:161} INFO - Subtask
consume_task
[2024-11-23, 13:24:36 UTC] {task_command.py:446} INFO - Running
<TaskInstance: kafka_issue.consume_task
manual__2024-11-23T13:24:25.243056+00:00 [running]> on host 2c7e65d643ac
[2024-11-23, 13:24:36 UTC] {taskinstance.py:2910} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='kafka_issue'
AIRFLOW_CTX_TASK_ID='consume_task'
AIRFLOW_CTX_LOGICAL_DATE='2024-11-23T13:24:25.243056+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2024-11-23T13:24:25.243056+00:00'
[2024-11-23, 13:24:36 UTC] {logging_mixin.py:191} INFO - Task instance is in
running state
[2024-11-23, 13:24:36 UTC] {logging_mixin.py:191} INFO - Previous state of
the Task instance: queued
[2024-11-23, 13:24:36 UTC] {logging_mixin.py:191} INFO - Current task
name:consume_task state:running start_date:2024-11-23 13:24:36.673356+00:00
[2024-11-23, 13:24:36 UTC] {logging_mixin.py:191} INFO - Dag
name:kafka_issue and current dag run status:running
[2024-11-23, 13:24:36 UTC] {taskinstance.py:723} ▲▲▲ Log group end
[2024-11-23, 13:24:36 UTC] {base.py:66} INFO - Retrieving connection
'kafka_connection'
[2024-11-23, 13:25:07 UTC] {logging_mixin.py:191} INFO - Exception received:
KafkaError{code=_TRANSPORT,val=-195,str="hello.com:9092/bootstrap: Connection
setup timed out in state CONNECT (after 30403ms in state CONNECT)"}
[2024-11-23, 13:25:36 UTC] {taskinstance.py:3097} ERROR - Task failed with
exception
Traceback (most recent call last):
File "/opt/airflow/airflow/models/taskinstance.py", line 759, in
_execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File "/opt/airflow/airflow/models/taskinstance.py", line 725, in
_execute_callable
return ExecutionCallableRunner(
File "/opt/airflow/airflow/utils/operator_helpers.py", line 268, in run
return func(*args, **kwargs)
File "/opt/airflow/airflow/models/baseoperator.py", line 375, in wrapper
return func(self, *args, **kwargs)
File
"/opt/airflow/providers/src/airflow/providers/apache/kafka/operators/consume.py",
line 162, in execute
msgs = consumer.consume(num_messages=batch_size,
timeout=self.poll_timeout)
File
"/opt/airflow/providers/src/airflow/providers/apache/kafka/hooks/consume.py",
line 31, in error_callback
print("Exception received: ", err)
File
"/opt/airflow/providers/src/airflow/providers/apache/kafka/hooks/consume.py",
line 32, in error_callback
raise KafkaAuthenticationError(f"Authentication failed: {err}")
airflow.providers.apache.kafka.hooks.consume.KafkaAuthenticationError:
Authentication failed:
KafkaError{code=_TRANSPORT,val=-195,str="hello.com:9092/bootstrap: Connection
setup timed out in state CONNECT (after 30403ms in state CONNECT)"}
[2024-11-23, 13:25:36 UTC] {logging_mixin.py:191} INFO - Task instance in
failure state
[2024-11-23, 13:25:36 UTC] {logging_mixin.py:191} INFO - Task
start:2024-11-23 13:24:36.673356+00:00 end:2024-11-23 13:25:36.809691+00:00
duration:60.136335
[2024-11-23, 13:25:36 UTC] {logging_mixin.py:191} INFO -
Task:<Task(ConsumeFromTopicOperator): consume_task> dag:<DAG: kafka_issue>
dagrun:<DagRun kafka_issue @ 2024-11-23 13:24:25.243056+00:00:
manual__2024-11-23T13:24:25.243056+00:00, state:running, queued_at: 2024-11-23
13:24:25.252615+00:00. externally triggered: True>
[2024-11-23, 13:25:36 UTC] {logging_mixin.py:191} INFO - Failure caused by
Authentication failed:
KafkaError{code=_TRANSPORT,val=-195,str="hello.com:9092/bootstrap: Connection
setup timed out in state CONNECT (after 30403ms in state CONNECT)"}
[2024-11-23, 13:25:36 UTC] {taskinstance.py:1139} INFO - Marking task as
UP_FOR_RETRY. dag_id=kafka_issue, task_id=consume_task,
run_id=manual__2024-11-23T13:24:25.243056+00:00, logical_date=20241123T132425,
start_date=20241123T132436, end_date=20241123T132536
[2024-11-23, 13:25:36 UTC] {taskinstance.py:346} ▼ Post task execution logs
[2024-11-23, 13:25:36 UTC] {standard_task_runner.py:178} ERROR - Failed to
execute task_id=consume_task pid=15454
Traceback (most recent call last):
File "/opt/airflow/airflow/task/standard_task_runner.py", line 171, in
_start_by_fork
ret = args.func(args, dag=self.dag)
File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
File "/opt/airflow/airflow/utils/cli.py", line 112, in wrapper
return f(*args, **kwargs)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 462, in
task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 257, in
_run_task_by_selected_method
return _run_raw_task(args, ti)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 325, in
_run_raw_task
return ti._run_raw_task(
File "/opt/airflow/airflow/utils/session.py", line 97, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/airflow/airflow/models/taskinstance.py", line 2790, in
_run_raw_task
return _run_raw_task(
File "/opt/airflow/airflow/models/taskinstance.py", line 279, in
_run_raw_task
TaskInstance._execute_task_with_callbacks(
File "/opt/airflow/airflow/models/taskinstance.py", line 2937, in
_execute_task_with_callbacks
result = self._execute_task(context, task_orig)
File "/opt/airflow/airflow/models/taskinstance.py", line 2961, in
_execute_task
return _execute_task(self, context, task_orig)
File "/opt/airflow/airflow/models/taskinstance.py", line 759, in
_execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File "/opt/airflow/airflow/models/taskinstance.py", line 725, in
_execute_callable
return ExecutionCallableRunner(
File "/opt/airflow/airflow/utils/operator_helpers.py", line 268, in run
return func(*args, **kwargs)
File "/opt/airflow/airflow/models/baseoperator.py", line 375, in wrapper
return func(self, *args, **kwargs)
File
"/opt/airflow/providers/src/airflow/providers/apache/kafka/operators/consume.py",
line 162, in execute
msgs = consumer.consume(num_messages=batch_size,
timeout=self.poll_timeout)
File
"/opt/airflow/providers/src/airflow/providers/apache/kafka/hooks/consume.py",
line 31, in error_callback
print("Exception received: ", err)
File
"/opt/airflow/providers/src/airflow/providers/apache/kafka/hooks/consume.py",
line 32, in error_callback
raise KafkaAuthenticationError(f"Authentication failed: {err}")
airflow.providers.apache.kafka.hooks.consume.KafkaAuthenticationError:
Authentication failed:
KafkaError{code=_TRANSPORT,val=-195,str="hello.com:9092/bootstrap: Connection
setup timed out in state CONNECT (after 30403ms in state CONNECT)"}
[2024-11-23, 13:25:36 UTC] {local_task_job_runner.py:263} INFO - Task exited
with return code 1
[2024-11-23, 13:25:36 UTC] {local_task_job_runner.py:242} ▲▲▲ Log group end
```
This is the connection I used:
```
{
"bootstrap.servers": "hello.com:9092",
"group.id": "ea"
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]