pauloventurab commented on issue #31330:
URL: https://github.com/apache/airflow/issues/31330#issuecomment-1563127948

   Hello again, 
   
   Just to help in something, I think I could manage something using this 
example dag: 
https://github.com/apache/airflow/blob/providers-apache-kafka/1.1.0/tests/system/providers/apache/kafka/example_dag_event_listener.py
   
   The only problem is that I can't use the confluent cloud parameters:
   
   ```
   extra=json.dumps(
                   {
                       "bootstrap.servers": "hide.gcp.confluent.cloud:9092",
                       "security_protocol": "SASL_SSL",
                       "sasl_mechanism": "PLAIN",
                       "sasl_plain_username": "hide",
                       "sasl_plain_password": "hide",
                       "group.id": "my-group",
                       "enable.auto.commit": False,
                       "auto.offset.reset": "earliest",
                   }
   ```
   
   Error: 
   
   `Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 529, in cleanup_finished_triggers
       result = details["task"].result()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 607, in run_trigger
       async for event in trigger.run():
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/kafka/triggers/await_message.py",
 line 91, in run
       consumer_hook = KafkaConsumerHook(topics=self.topics, 
kafka_config_id=self.kafka_config_id)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/kafka/hooks/consume.py",
 line 36, in __init__
       super().__init__(kafka_config_id=kafka_config_id)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/kafka/hooks/base.py",
 line 43, in __init__
       self.get_conn
     File 
"/home/airflow/.local/lib/python3.7/site-packages/cached_property.py", line 36, 
in __get__
       value = obj.__dict__[self.func.__name__] = self.func(obj)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/kafka/hooks/base.py",
 line 67, in get_conn
       return self._get_client(config)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/kafka/hooks/consume.py",
 line 40, in _get_client
       return Consumer(config)
   cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="No such 
configuration property: "security_protocol""}`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to