This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 424fc17d49 fixed some errant strings in the kafka example dags (#30818)
424fc17d49 is described below
commit 424fc17d49afd4175826a62aa4fe7aa7c5772143
Author: Dylan Storey <[email protected]>
AuthorDate: Sat Apr 22 17:14:08 2023 -0400
fixed some errant strings in the kafka example dags (#30818)
* fixed some errant strings in the kafka example dags
* fixed some errant strings in the kafka example dags
---
.../apache/kafka/example_dag_event_listener.py | 60 +++++---
.../apache/kafka/example_dag_hello_kafka.py | 158 +++++++++++----------
2 files changed, 117 insertions(+), 101 deletions(-)
diff --git a/tests/system/providers/apache/kafka/example_dag_event_listener.py
b/tests/system/providers/apache/kafka/example_dag_event_listener.py
index c6d0df8d4a..f7023ec4e2 100644
--- a/tests/system/providers/apache/kafka/example_dag_event_listener.py
+++ b/tests/system/providers/apache/kafka/example_dag_event_listener.py
@@ -26,31 +26,41 @@ from pendulum import datetime
from airflow import DAG
+# Connections needed for this example dag to finish
+from airflow.models import Connection
+
# This is just for setting up connections in the demo - you should use standard
# methods for setting these connections in production
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.apache.kafka.operators.produce import
ProduceToTopicOperator
from airflow.providers.apache.kafka.sensors.kafka import
AwaitMessageTriggerFunctionSensor
+from airflow.utils import db
-# Connections needed for this example dag to finish
-# from airflow.models import Connection
-# from airflow.utils import db
-#
-# db.merge_conn(
-# Connection(
-# conn_id="fizz_buzz",
-# conn_type="kafka",
-# extra=json.dumps(
-# {
-# "bootstrap.servers": "broker:29092",
-# "group.id": "fizz_buzz",
-# "enable.auto.commit": False,
-# "auto.offset.reset": "beginning",
-# }
-# ),
-# )
-# )
+
+def load_connections():
+ db.merge_conn(
+ Connection(
+ conn_id="fizz_buzz_1",
+ conn_type="kafka",
+ extra=json.dumps({"socket.timeout.ms": 10, "bootstrap.servers":
"broker:29092"}),
+ )
+ )
+
+ db.merge_conn(
+ Connection(
+ conn_id="fizz_buzz_2",
+ conn_type="kafka",
+ extra=json.dumps(
+ {
+ "bootstrap.servers": "broker:29092",
+ "group.id": "fizz_buzz",
+ "enable.auto.commit": False,
+ "auto.offset.reset": "beginning",
+ }
+ ),
+ )
+ )
def _producer_function():
@@ -71,8 +81,10 @@ with DAG(
tags=["fizz-buzz"],
) as dag:
+ t0 = PythonOperator(task_id="load_connections",
python_callable=load_connections)
+
t1 = ProduceToTopicOperator(
- kafka_config_id="fizz_buzz",
+ kafka_config_id="fizz_buzz_1",
task_id="produce_to_topic",
topic="fizz_buzz",
producer_function=_producer_function,
@@ -97,25 +109,27 @@ with DAG(
def pick_downstream_dag(message, **context):
if message % 15 == 0:
print(f"encountered {message} - executing external dag!")
- TriggerDagRunOperator(trigger_dag_id="fizz_buzz",
task_id=f"{message}{_generate_uuid()}").execute(
+ TriggerDagRunOperator(trigger_dag_id="fizz-buzz",
task_id=f"{message}{_generate_uuid()}").execute(
context
)
else:
if message % 3 == 0:
print(f"encountered {message} FIZZ !")
- if message & 5 == 0:
+ if message % 5 == 0:
print(f"encountered {message} BUZZ !")
# [START howto_sensor_await_message_trigger_function]
listen_for_message = AwaitMessageTriggerFunctionSensor(
- kafka_config_id="fizz_buzz",
+ kafka_config_id="fizz_buzz_2",
task_id="listen_for_message",
topics=["fizz_buzz"],
- apply_function="event_listener.await_function",
+ apply_function="example_dag_event_listener.await_function",
event_triggered_function=pick_downstream_dag,
)
# [END howto_sensor_await_message_trigger_function]
+ t0 >> t1
+
with DAG(
dag_id="fizz-buzz",
description="Triggered when mod 15 is 0.",
diff --git a/tests/system/providers/apache/kafka/example_dag_hello_kafka.py
b/tests/system/providers/apache/kafka/example_dag_hello_kafka.py
index 83fb2d930a..4bc2b44451 100644
--- a/tests/system/providers/apache/kafka/example_dag_hello_kafka.py
+++ b/tests/system/providers/apache/kafka/example_dag_hello_kafka.py
@@ -30,79 +30,6 @@ from airflow.providers.apache.kafka.operators.consume import
ConsumeFromTopicOpe
from airflow.providers.apache.kafka.operators.produce import
ProduceToTopicOperator
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor
-# Connections needed for this example dag to finish
-# from airflow.models import Connection
-# from airflow.utils import db
-# db.merge_conn(
-# Connection(
-# conn_id="t1-3",
-# conn_type="kafka",
-# extra=json.dumps({"socket.timeout.ms": 10, "bootstrap.servers":
"broker:29092"}),
-# )
-# )
-
-# db.merge_conn(
-# Connection(
-# conn_id="t2",
-# conn_type="kafka",
-# extra=json.dumps(
-# {
-# "bootstrap.servers": "broker:29092",
-# "group.id": "t2",
-# "enable.auto.commit": False,
-# "auto.offset.reset": "beginning",
-# }
-# ),
-# )
-# )
-
-# db.merge_conn(
-# Connection(
-# conn_id="t4",
-# conn_type="kafka",
-# extra=json.dumps(
-# {
-# "bootstrap.servers": "broker:29092",
-# "group.id": "t4",
-# "enable.auto.commit": False,
-# "auto.offset.reset": "beginning",
-# }
-# ),
-# )
-# )
-
-# db.merge_conn(
-# Connection(
-# conn_id="t4b",
-# conn_type="kafka",
-# extra=json.dumps(
-# {
-# "bootstrap.servers": "broker:29092",
-# "group.id": "t4b",
-# "enable.auto.commit": False,
-# "auto.offset.reset": "beginning",
-# }
-# ),
-# )
-# )
-
-
-# db.merge_conn(
-# Connection(
-# conn_id="t5",
-# conn_type="kafka",
-# extra=json.dumps(
-# {
-# "bootstrap.servers": "broker:29092",
-# "group.id": "t5",
-# "enable.auto.commit": False,
-# "auto.offset.reset": "beginning",
-# }
-# ),
-# )
-# )
-
-
default_args = {
"owner": "airflow",
"depend_on_past": False,
@@ -113,6 +40,80 @@ default_args = {
}
+def load_connections():
+ # Connections needed for this example dag to finish
+ from airflow.models import Connection
+ from airflow.utils import db
+
+ db.merge_conn(
+ Connection(
+ conn_id="t1-3",
+ conn_type="kafka",
+ extra=json.dumps({"socket.timeout.ms": 10, "bootstrap.servers":
"broker:29092"}),
+ )
+ )
+
+ db.merge_conn(
+ Connection(
+ conn_id="t2",
+ conn_type="kafka",
+ extra=json.dumps(
+ {
+ "bootstrap.servers": "broker:29092",
+ "group.id": "t2",
+ "enable.auto.commit": False,
+ "auto.offset.reset": "beginning",
+ }
+ ),
+ )
+ )
+
+ db.merge_conn(
+ Connection(
+ conn_id="t4",
+ conn_type="kafka",
+ extra=json.dumps(
+ {
+ "bootstrap.servers": "broker:29092",
+ "group.id": "t4",
+ "enable.auto.commit": False,
+ "auto.offset.reset": "beginning",
+ }
+ ),
+ )
+ )
+
+ db.merge_conn(
+ Connection(
+ conn_id="t4b",
+ conn_type="kafka",
+ extra=json.dumps(
+ {
+ "bootstrap.servers": "broker:29092",
+ "group.id": "t4b",
+ "enable.auto.commit": False,
+ "auto.offset.reset": "beginning",
+ }
+ ),
+ )
+ )
+
+ db.merge_conn(
+ Connection(
+ conn_id="t5",
+ conn_type="kafka",
+ extra=json.dumps(
+ {
+ "bootstrap.servers": "broker:29092",
+ "group.id": "t5",
+ "enable.auto.commit": False,
+ "auto.offset.reset": "beginning",
+ }
+ ),
+ )
+ )
+
+
def producer_function():
for i in range(20):
yield (json.dumps(i), json.dumps(i + 1))
@@ -155,13 +156,14 @@ with DAG(
catchup=False,
tags=["example"],
) as dag:
+ t0 = PythonOperator(task_id="load_connections",
python_callable=load_connections)
# [START howto_operator_produce_to_topic]
t1 = ProduceToTopicOperator(
kafka_config_id="t1-3",
task_id="produce_to_topic",
topic="test_1",
- producer_function="hello_kafka.producer_function",
+ producer_function="example_dag_hello_kafka.producer_function",
)
# [END howto_operator_produce_to_topic]
@@ -173,7 +175,7 @@ with DAG(
kafka_config_id="t2",
task_id="consume_from_topic",
topics=["test_1"],
- apply_function="hello_kafka.consumer_function",
+ apply_function="example_dag_hello_kafka.consumer_function",
apply_function_kwargs={"prefix": "consumed:::"},
commit_cadence="end_of_batch",
max_messages=10,
@@ -222,7 +224,7 @@ with DAG(
kafka_config_id="t5",
task_id="awaiting_message",
topics=["test_1"],
- apply_function="hello_kafka.await_function",
+ apply_function="example_dag_hello_kafka.await_function",
xcom_push_key="retrieved_message",
)
# [END howto_sensor_await_message]
@@ -234,8 +236,8 @@ with DAG(
t6.doc_md = "The task that is executed after the deferable task returns
for execution."
- t1 >> t2
- t3 >> [t4, t4b] >> t5 >> t6
+ t0 >> t1 >> t2
+ t0 >> t3 >> [t4, t4b] >> t5 >> t6
from tests.system.utils import get_test_run # noqa: E402