rawwar opened a new issue, #43569:
URL: https://github.com/apache/airflow/issues/43569

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I created a kafka connection with the dummy values and yet the Operator 
succeeded with the following task log:
   
   ```
   [2024-11-01, 07:35:06 IST] {local_task_job_runner.py:123} ▼ Pre task 
execution logs
   [2024-11-01, 07:35:06 IST] {taskinstance.py:2613} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
kafka_issue.consume_task manual__2024-11-01T02:05:05.395642+00:00 [queued]>
   [2024-11-01, 07:35:06 IST] {taskinstance.py:2613} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: kafka_issue.consume_task 
manual__2024-11-01T02:05:05.395642+00:00 [queued]>
   [2024-11-01, 07:35:06 IST] {taskinstance.py:2866} INFO - Starting attempt 1 
of 2
   [2024-11-01, 07:35:06 IST] {taskinstance.py:2889} INFO - Executing 
<Task(ConsumeFromTopicOperator): consume_task> on 2024-11-01 
02:05:05.395642+00:00
   [2024-11-01, 07:35:06 IST] {standard_task_runner.py:104} INFO - Running: 
['airflow', 'tasks', 'run', 'kafka_issue', 'consume_task', 
'manual__2024-11-01T02:05:05.395642+00:00', '--job-id', '35', '--raw', 
'--subdir', 'DAGS_FOLDER/kafka_min.py', '--cfg-path', '/tmp/tmpflvhwk6b']
   [2024-11-01, 07:35:06 IST] {standard_task_runner.py:105} INFO - Job 35: 
Subtask consume_task
   [2024-11-01, 07:35:06 IST] {logging_mixin.py:190} WARNING - 
/usr/local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py:70
 DeprecationWarning: This process (pid=217) is multi-threaded, use of fork() 
may lead to deadlocks in the child.
   [2024-11-01, 07:35:06 IST] {standard_task_runner.py:72} INFO - Started 
process 218 to run task
   [2024-11-01, 07:35:06 IST] {task_command.py:467} INFO - Running 
<TaskInstance: kafka_issue.consume_task 
manual__2024-11-01T02:05:05.395642+00:00 [running]> on host 65fc791ee8f4
   [2024-11-01, 07:35:06 IST] {taskinstance.py:3132} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='kafka_issue' 
AIRFLOW_CTX_TASK_ID='consume_task' 
AIRFLOW_CTX_EXECUTION_DATE='2024-11-01T02:05:05.395642+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2024-11-01T02:05:05.395642+00:00'
   [2024-11-01, 07:35:06 IST] {taskinstance.py:731} ▲▲▲ Log group end
   [2024-11-01, 07:35:06 IST] {base.py:84} INFO - Retrieving connection 
'kafka_connection'
   [2024-11-01, 07:36:06 IST] {consume.py:167} INFO - Reached end of log. 
Exiting.
   [2024-11-01, 07:36:06 IST] {consume.py:182} INFO - committing offset at 
end_of_batch
   [2024-11-01, 07:36:06 IST] {taskinstance.py:340} ▼ Post task execution logs
   [2024-11-01, 07:36:06 IST] {taskinstance.py:352} INFO - Marking task as 
SUCCESS. dag_id=kafka_issue, task_id=consume_task, 
run_id=manual__2024-11-01T02:05:05.395642+00:00, 
execution_date=20241101T020505, start_date=20241101T020506, 
end_date=20241101T020606
   [2024-11-01, 07:36:06 IST] {local_task_job_runner.py:266} INFO - Task exited 
with return code 0
   [2024-11-01, 07:36:06 IST] {taskinstance.py:3901} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   [2024-11-01, 07:36:06 IST] {local_task_job_runner.py:245} ▲▲▲ Log group end
   ```
   
   
   ### What you think should happen instead?
   
   When credentials are wrong, the operator should fail
   
   ### How to reproduce
   
   Connection I used:
   ```
   {
     "bootstrap.servers": "hello.com:9092",
     "group.id": "ea",
     "auto.offset.reset": "earliest",
     "sasl.mechanism": "PLAIN",
     "sasl.username": "ewfew",
     "sasl.password": "Owefw"
   }
   ```
   
   I used the following DAG:
   
   ```
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.providers.apache.kafka.operators.consume import 
ConsumeFromTopicOperator
   from datetime import datetime, timedelta
   import json
   
   KAFKA_TOPIC = "random"
   
   def print_msg(msg):
       print(msg)
   
   
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 1,
       'retry_delay': timedelta(minutes=5)
   }
   
   with DAG('kafka_issue',
            start_date=datetime(2024, 1, 20),
            max_active_runs=1,
            schedule_interval=None,
            default_args=default_args,
            ) as dag:
   
       consumer_topic = ConsumeFromTopicOperator(
           task_id="consume_task",
           kafka_config_id="kafka_connection",        
           topics=[KAFKA_TOPIC],
           apply_function="kafka_min.print_msg",
           commit_cadence="end_of_batch",
           max_messages=5,
           max_batch_size=2,
       )
       testconsume = DummyOperator(
           task_id='test_consume_kafka'
       )
   
   
       testconsume >> consumer_topic 
   ```
   
   to reproduce the issue, please name your dag file `kafka_min` or update the 
apply_function accordingly
   
   ### Operating System
   
   MacOS - 15.0.1 (24A348)
   
   ### Versions of Apache Airflow Providers
   
   confluent-kafka
   apache-airflow-providers-apache-kafka
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to