ikholodkov opened a new issue, #32585:
URL: https://github.com/apache/airflow/issues/32585
### Apache Airflow version
2.6.3
### What happened
While trying to use AwaitMessageTriggerFunctionSensor i'm increasing count
of dagruns.
I've encountered an exception `cimpl.KafkaException:
KafkaError{code=_NO_OFFSET,val=-168,str="Commit failed: Local: No offset
stored"}`.
I tried to set consumers count less, equal and more than partitions but
every time the error happened.
Here is a log:
```[2023-07-13, 14:37:07 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance:
kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [queued]>
[2023-07-13, 14:37:07 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance:
kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [queued]>
[2023-07-13, 14:37:07 UTC] {taskinstance.py:1308} INFO - Starting attempt 1
of 1
[2023-07-13, 14:37:07 UTC] {taskinstance.py:1327} INFO - Executing
<Task(AwaitMessageTriggerFunctionSensor): await_message> on 2023-07-13
14:35:00+00:00
[2023-07-13, 14:37:07 UTC] {standard_task_runner.py:57} INFO - Started
process 8918 to run task
[2023-07-13, 14:37:07 UTC] {standard_task_runner.py:84} INFO - Running:
['airflow', 'tasks', 'run', 'kafka_test_dag', 'await_message',
'scheduled__2023-07-13T14:35:00+00:00', '--job-id', '629111', '--raw',
'--subdir', 'DAGS_FOLDER/dags/kafka_consumers_dag.py', '--cfg-path',
'/tmp/tmp3de57b65']
[2023-07-13, 14:37:07 UTC] {standard_task_runner.py:85} INFO - Job 629111:
Subtask await_message
[2023-07-13, 14:37:08 UTC] {task_command.py:410} INFO - Running
<TaskInstance: kafka_test_dag.await_message
scheduled__2023-07-13T14:35:00+00:00 [running]> on host
airflow-worker-1.airflow-worker.syn-airflow-dev.svc.opus.s.mesh
[2023-07-13, 14:37:08 UTC] {taskinstance.py:1545} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='kafka_test_dag'
AIRFLOW_CTX_TASK_ID='await_message'
AIRFLOW_CTX_EXECUTION_DATE='2023-07-13T14:35:00+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-13T14:35:00+00:00'
[2023-07-13, 14:37:09 UTC] {taskinstance.py:1415} INFO - Pausing task as
DEFERRED. dag_id=kafka_test_dag, task_id=await_message,
execution_date=20230713T143500, start_date=20230713T143707
[2023-07-13, 14:37:09 UTC] {local_task_job_runner.py:222} INFO - Task exited
with return code 100 (task deferral)
[2023-07-13, 14:38:43 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance:
kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [queued]>
[2023-07-13, 14:38:43 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance:
kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [queued]>
[2023-07-13, 14:38:43 UTC] {taskinstance.py:1306} INFO - Resuming after
deferral
[2023-07-13, 14:38:44 UTC] {taskinstance.py:1327} INFO - Executing
<Task(AwaitMessageTriggerFunctionSensor): await_message> on 2023-07-13
14:35:00+00:00
[2023-07-13, 14:38:44 UTC] {standard_task_runner.py:57} INFO - Started
process 9001 to run task
[2023-07-13, 14:38:44 UTC] {standard_task_runner.py:84} INFO - Running:
['airflow', 'tasks', 'run', 'kafka_test_dag', 'await_message',
'scheduled__2023-07-13T14:35:00+00:00', '--job-id', '629114', '--raw',
'--subdir', 'DAGS_FOLDER/dags/kafka_consumers_dag.py', '--cfg-path',
'/tmp/tmpo6xz234q']
[2023-07-13, 14:38:44 UTC] {standard_task_runner.py:85} INFO - Job 629114:
Subtask await_message
[2023-07-13, 14:38:45 UTC] {task_command.py:410} INFO - Running
<TaskInstance: kafka_test_dag.await_message
scheduled__2023-07-13T14:35:00+00:00 [running]> on host
airflow-worker-1.airflow-worker.airflow-dev.svc.opus.s.mesh
[2023-07-13, 14:38:46 UTC] {taskinstance.py:1598} ERROR - Trigger failed:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py",
line 537, in cleanup_finished_triggers
result = details["task"].result()
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py",
line 615, in run_trigger
async for event in trigger.run():
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/apache/kafka/triggers/await_message.py",
line 114, in run
await async_commit(asynchronous=False)
File "/home/airflow/.local/lib/python3.11/site-packages/asgiref/sync.py",
line 479, in __call__
ret: _R = await loop.run_in_executor(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in
run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/asgiref/sync.py",
line 538, in thread_handler
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
cimpl.KafkaException: KafkaError{code=_NO_OFFSET,val=-168,str="Commit
failed: Local: No offset stored"}
[2023-07-13, 14:38:47 UTC] {taskinstance.py:1824} ERROR - Task failed with
exception
airflow.exceptions.TaskDeferralError: Trigger failure
[2023-07-13, 14:38:47 UTC] {taskinstance.py:1345} INFO - Marking task as
FAILED. dag_id=kafka_test_dag, task_id=await_message,
execution_date=20230713T143500, start_date=20230713T143707,
end_date=20230713T143847
[2023-07-13, 14:38:48 UTC] {standard_task_runner.py:104} ERROR - Failed to
execute job 629114 for task await_message (Trigger failure; 9001)
[2023-07-13, 14:38:48 UTC] {local_task_job_runner.py:225} INFO - Task exited
with return code 1
[2023-07-13, 14:38:48 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks
scheduled from follow-on schedule check
```
### What you think should happen instead
Sensor should get a message without errors. Each message should be committed
once.
### How to reproduce
Example of a DAG:
```
from airflow.decorators import dag
from airflow.models import Variable
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
from airflow.providers.apache.kafka.sensors.kafka import \
AwaitMessageTriggerFunctionSensor
import uuid
def check_message(message):
if message:
return True
def trigger_dag(**context):
TriggerDagRunOperator(
trigger_dag_id='triggerer_test_dag',
task_id=f"triggered_downstream_dag_{uuid.uuid4()}"
).execute(context)
@dag(
description="This DAG listens kafka topic and triggers DAGs "
"based on received message.",
schedule_interval='* * * * *',
start_date=days_ago(2),
max_active_runs=4,
catchup=False
)
def kafka_test_dag():
AwaitMessageTriggerFunctionSensor(
task_id="await_message",
topics=['my_test_topic'],
apply_function="dags.kafka_consumers_dag.check_message",
event_triggered_function=trigger_dag
)
kafka_test_dag()
```
### Operating System
Debian GNU/Linux 11 (bullseye)
### Versions of Apache Airflow Providers
apache-airflow-providers-apache-kafka==1.1.2
### Deployment
Other 3rd-party Helm chart
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]