pauloventurab commented on issue #31330: URL: https://github.com/apache/airflow/issues/31330#issuecomment-1563127948
Hello again, Just to help in something, I think I could manage something using this example dag: https://github.com/apache/airflow/blob/providers-apache-kafka/1.1.0/tests/system/providers/apache/kafka/example_dag_event_listener.py The only problem is that I can't use the confluent cloud parameters: ``` extra=json.dumps( { "bootstrap.servers": "hide.gcp.confluent.cloud:9092", "security_protocol": "SASL_SSL", "sasl_mechanism": "PLAIN", "sasl_plain_username": "hide", "sasl_plain_password": "hide", "group.id": "my-group", "enable.auto.commit": False, "auto.offset.reset": "earliest", } ``` Error: `Traceback (most recent call last): File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/triggerer_job_runner.py", line 529, in cleanup_finished_triggers result = details["task"].result() File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/triggerer_job_runner.py", line 607, in run_trigger async for event in trigger.run(): File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/kafka/triggers/await_message.py", line 91, in run consumer_hook = KafkaConsumerHook(topics=self.topics, kafka_config_id=self.kafka_config_id) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/kafka/hooks/consume.py", line 36, in __init__ super().__init__(kafka_config_id=kafka_config_id) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/kafka/hooks/base.py", line 43, in __init__ self.get_conn File "/home/airflow/.local/lib/python3.7/site-packages/cached_property.py", line 36, in __get__ value = obj.__dict__[self.func.__name__] = self.func(obj) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/kafka/hooks/base.py", line 67, in get_conn return self._get_client(config) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/kafka/hooks/consume.py", line 40, in _get_client return Consumer(config) cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="No such configuration property: "security_protocol""}` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
