rawwar opened a new issue, #43569:
URL: https://github.com/apache/airflow/issues/43569
### Apache Airflow version
Other Airflow 2 version (please specify below)
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
I created a kafka connection with the dummy values and yet the Operator
succeeded with the following task log:
```
[2024-11-01, 07:35:06 IST] {local_task_job_runner.py:123} ▼ Pre task
execution logs
[2024-11-01, 07:35:06 IST] {taskinstance.py:2613} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance:
kafka_issue.consume_task manual__2024-11-01T02:05:05.395642+00:00 [queued]>
[2024-11-01, 07:35:06 IST] {taskinstance.py:2613} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance: kafka_issue.consume_task
manual__2024-11-01T02:05:05.395642+00:00 [queued]>
[2024-11-01, 07:35:06 IST] {taskinstance.py:2866} INFO - Starting attempt 1
of 2
[2024-11-01, 07:35:06 IST] {taskinstance.py:2889} INFO - Executing
<Task(ConsumeFromTopicOperator): consume_task> on 2024-11-01
02:05:05.395642+00:00
[2024-11-01, 07:35:06 IST] {standard_task_runner.py:104} INFO - Running:
['airflow', 'tasks', 'run', 'kafka_issue', 'consume_task',
'manual__2024-11-01T02:05:05.395642+00:00', '--job-id', '35', '--raw',
'--subdir', 'DAGS_FOLDER/kafka_min.py', '--cfg-path', '/tmp/tmpflvhwk6b']
[2024-11-01, 07:35:06 IST] {standard_task_runner.py:105} INFO - Job 35:
Subtask consume_task
[2024-11-01, 07:35:06 IST] {logging_mixin.py:190} WARNING -
/usr/local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py:70
DeprecationWarning: This process (pid=217) is multi-threaded, use of fork()
may lead to deadlocks in the child.
[2024-11-01, 07:35:06 IST] {standard_task_runner.py:72} INFO - Started
process 218 to run task
[2024-11-01, 07:35:06 IST] {task_command.py:467} INFO - Running
<TaskInstance: kafka_issue.consume_task
manual__2024-11-01T02:05:05.395642+00:00 [running]> on host 65fc791ee8f4
[2024-11-01, 07:35:06 IST] {taskinstance.py:3132} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='kafka_issue'
AIRFLOW_CTX_TASK_ID='consume_task'
AIRFLOW_CTX_EXECUTION_DATE='2024-11-01T02:05:05.395642+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2024-11-01T02:05:05.395642+00:00'
[2024-11-01, 07:35:06 IST] {taskinstance.py:731} ▲▲▲ Log group end
[2024-11-01, 07:35:06 IST] {base.py:84} INFO - Retrieving connection
'kafka_connection'
[2024-11-01, 07:36:06 IST] {consume.py:167} INFO - Reached end of log.
Exiting.
[2024-11-01, 07:36:06 IST] {consume.py:182} INFO - committing offset at
end_of_batch
[2024-11-01, 07:36:06 IST] {taskinstance.py:340} ▼ Post task execution logs
[2024-11-01, 07:36:06 IST] {taskinstance.py:352} INFO - Marking task as
SUCCESS. dag_id=kafka_issue, task_id=consume_task,
run_id=manual__2024-11-01T02:05:05.395642+00:00,
execution_date=20241101T020505, start_date=20241101T020506,
end_date=20241101T020606
[2024-11-01, 07:36:06 IST] {local_task_job_runner.py:266} INFO - Task exited
with return code 0
[2024-11-01, 07:36:06 IST] {taskinstance.py:3901} INFO - 0 downstream tasks
scheduled from follow-on schedule check
[2024-11-01, 07:36:06 IST] {local_task_job_runner.py:245} ▲▲▲ Log group end
```
### What you think should happen instead?
When credentials are wrong, the operator should fail
### How to reproduce
Connection I used:
```
{
"bootstrap.servers": "hello.com:9092",
"group.id": "ea",
"auto.offset.reset": "earliest",
"sasl.mechanism": "PLAIN",
"sasl.username": "ewfew",
"sasl.password": "Owefw"
}
```
I used the following DAG:
```
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.apache.kafka.operators.consume import
ConsumeFromTopicOperator
from datetime import datetime, timedelta
import json
KAFKA_TOPIC = "random"
def print_msg(msg):
print(msg)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG('kafka_issue',
start_date=datetime(2024, 1, 20),
max_active_runs=1,
schedule_interval=None,
default_args=default_args,
) as dag:
consumer_topic = ConsumeFromTopicOperator(
task_id="consume_task",
kafka_config_id="kafka_connection",
topics=[KAFKA_TOPIC],
apply_function="kafka_min.print_msg",
commit_cadence="end_of_batch",
max_messages=5,
max_batch_size=2,
)
testconsume = DummyOperator(
task_id='test_consume_kafka'
)
testconsume >> consumer_topic
```
to reproduce the issue, please name your dag file `kafka_min` or update the
apply_function accordingly
### Operating System
MacOS - 15.0.1 (24A348)
### Versions of Apache Airflow Providers
confluent-kafka
apache-airflow-providers-apache-kafka
### Deployment
Astronomer
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]