This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 424fc17d49 fixed some errant strings in the kafka example dags (#30818)
424fc17d49 is described below

commit 424fc17d49afd4175826a62aa4fe7aa7c5772143
Author: Dylan Storey <[email protected]>
AuthorDate: Sat Apr 22 17:14:08 2023 -0400

    fixed some errant strings in the kafka example dags (#30818)
    
    * fixed some errant strings in the kafka example dags
    
    * fixed some errant strings in the kafka example dags
---
 .../apache/kafka/example_dag_event_listener.py     |  60 +++++---
 .../apache/kafka/example_dag_hello_kafka.py        | 158 +++++++++++----------
 2 files changed, 117 insertions(+), 101 deletions(-)

diff --git a/tests/system/providers/apache/kafka/example_dag_event_listener.py 
b/tests/system/providers/apache/kafka/example_dag_event_listener.py
index c6d0df8d4a..f7023ec4e2 100644
--- a/tests/system/providers/apache/kafka/example_dag_event_listener.py
+++ b/tests/system/providers/apache/kafka/example_dag_event_listener.py
@@ -26,31 +26,41 @@ from pendulum import datetime
 
 from airflow import DAG
 
+# Connections needed for this example dag to finish
+from airflow.models import Connection
+
 # This is just for setting up connections in the demo - you should use standard
 # methods for setting these connections in production
 from airflow.operators.python import PythonOperator
 from airflow.operators.trigger_dagrun import TriggerDagRunOperator
 from airflow.providers.apache.kafka.operators.produce import 
ProduceToTopicOperator
 from airflow.providers.apache.kafka.sensors.kafka import 
AwaitMessageTriggerFunctionSensor
+from airflow.utils import db
 
-# Connections needed for this example dag to finish
-# from airflow.models import Connection
-# from airflow.utils import db
-#
-# db.merge_conn(
-#     Connection(
-#         conn_id="fizz_buzz",
-#         conn_type="kafka",
-#         extra=json.dumps(
-#             {
-#                 "bootstrap.servers": "broker:29092",
-#                 "group.id": "fizz_buzz",
-#                 "enable.auto.commit": False,
-#                 "auto.offset.reset": "beginning",
-#             }
-#         ),
-#     )
-# )
+
+def load_connections():
+    db.merge_conn(
+        Connection(
+            conn_id="fizz_buzz_1",
+            conn_type="kafka",
+            extra=json.dumps({"socket.timeout.ms": 10, "bootstrap.servers": 
"broker:29092"}),
+        )
+    )
+
+    db.merge_conn(
+        Connection(
+            conn_id="fizz_buzz_2",
+            conn_type="kafka",
+            extra=json.dumps(
+                {
+                    "bootstrap.servers": "broker:29092",
+                    "group.id": "fizz_buzz",
+                    "enable.auto.commit": False,
+                    "auto.offset.reset": "beginning",
+                }
+            ),
+        )
+    )
 
 
 def _producer_function():
@@ -71,8 +81,10 @@ with DAG(
     tags=["fizz-buzz"],
 ) as dag:
 
+    t0 = PythonOperator(task_id="load_connections", 
python_callable=load_connections)
+
     t1 = ProduceToTopicOperator(
-        kafka_config_id="fizz_buzz",
+        kafka_config_id="fizz_buzz_1",
         task_id="produce_to_topic",
         topic="fizz_buzz",
         producer_function=_producer_function,
@@ -97,25 +109,27 @@ with DAG(
     def pick_downstream_dag(message, **context):
         if message % 15 == 0:
             print(f"encountered {message} - executing external dag!")
-            TriggerDagRunOperator(trigger_dag_id="fizz_buzz", 
task_id=f"{message}{_generate_uuid()}").execute(
+            TriggerDagRunOperator(trigger_dag_id="fizz-buzz", 
task_id=f"{message}{_generate_uuid()}").execute(
                 context
             )
         else:
             if message % 3 == 0:
                 print(f"encountered {message} FIZZ !")
-            if message & 5 == 0:
+            if message % 5 == 0:
                 print(f"encountered {message} BUZZ !")
 
     # [START howto_sensor_await_message_trigger_function]
     listen_for_message = AwaitMessageTriggerFunctionSensor(
-        kafka_config_id="fizz_buzz",
+        kafka_config_id="fizz_buzz_2",
         task_id="listen_for_message",
         topics=["fizz_buzz"],
-        apply_function="event_listener.await_function",
+        apply_function="example_dag_event_listener.await_function",
         event_triggered_function=pick_downstream_dag,
     )
     # [END howto_sensor_await_message_trigger_function]
 
+    t0 >> t1
+
 with DAG(
     dag_id="fizz-buzz",
     description="Triggered when mod 15 is 0.",
diff --git a/tests/system/providers/apache/kafka/example_dag_hello_kafka.py 
b/tests/system/providers/apache/kafka/example_dag_hello_kafka.py
index 83fb2d930a..4bc2b44451 100644
--- a/tests/system/providers/apache/kafka/example_dag_hello_kafka.py
+++ b/tests/system/providers/apache/kafka/example_dag_hello_kafka.py
@@ -30,79 +30,6 @@ from airflow.providers.apache.kafka.operators.consume import 
ConsumeFromTopicOpe
 from airflow.providers.apache.kafka.operators.produce import 
ProduceToTopicOperator
 from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor
 
-# Connections needed for this example dag to finish
-# from airflow.models import Connection
-# from airflow.utils import db
-# db.merge_conn(
-#     Connection(
-#         conn_id="t1-3",
-#         conn_type="kafka",
-#         extra=json.dumps({"socket.timeout.ms": 10, "bootstrap.servers": 
"broker:29092"}),
-#     )
-# )
-
-# db.merge_conn(
-#     Connection(
-#         conn_id="t2",
-#         conn_type="kafka",
-#         extra=json.dumps(
-#             {
-#                 "bootstrap.servers": "broker:29092",
-#                 "group.id": "t2",
-#                 "enable.auto.commit": False,
-#                 "auto.offset.reset": "beginning",
-#             }
-#         ),
-#     )
-# )
-
-# db.merge_conn(
-#     Connection(
-#         conn_id="t4",
-#         conn_type="kafka",
-#         extra=json.dumps(
-#             {
-#                 "bootstrap.servers": "broker:29092",
-#                 "group.id": "t4",
-#                 "enable.auto.commit": False,
-#                 "auto.offset.reset": "beginning",
-#             }
-#         ),
-#     )
-# )
-
-# db.merge_conn(
-#     Connection(
-#         conn_id="t4b",
-#         conn_type="kafka",
-#         extra=json.dumps(
-#             {
-#                 "bootstrap.servers": "broker:29092",
-#                 "group.id": "t4b",
-#                 "enable.auto.commit": False,
-#                 "auto.offset.reset": "beginning",
-#             }
-#         ),
-#     )
-# )
-
-
-# db.merge_conn(
-#     Connection(
-#         conn_id="t5",
-#         conn_type="kafka",
-#         extra=json.dumps(
-#             {
-#                 "bootstrap.servers": "broker:29092",
-#                 "group.id": "t5",
-#                 "enable.auto.commit": False,
-#                 "auto.offset.reset": "beginning",
-#             }
-#         ),
-#     )
-# )
-
-
 default_args = {
     "owner": "airflow",
     "depend_on_past": False,
@@ -113,6 +40,80 @@ default_args = {
 }
 
 
+def load_connections():
+    # Connections needed for this example dag to finish
+    from airflow.models import Connection
+    from airflow.utils import db
+
+    db.merge_conn(
+        Connection(
+            conn_id="t1-3",
+            conn_type="kafka",
+            extra=json.dumps({"socket.timeout.ms": 10, "bootstrap.servers": 
"broker:29092"}),
+        )
+    )
+
+    db.merge_conn(
+        Connection(
+            conn_id="t2",
+            conn_type="kafka",
+            extra=json.dumps(
+                {
+                    "bootstrap.servers": "broker:29092",
+                    "group.id": "t2",
+                    "enable.auto.commit": False,
+                    "auto.offset.reset": "beginning",
+                }
+            ),
+        )
+    )
+
+    db.merge_conn(
+        Connection(
+            conn_id="t4",
+            conn_type="kafka",
+            extra=json.dumps(
+                {
+                    "bootstrap.servers": "broker:29092",
+                    "group.id": "t4",
+                    "enable.auto.commit": False,
+                    "auto.offset.reset": "beginning",
+                }
+            ),
+        )
+    )
+
+    db.merge_conn(
+        Connection(
+            conn_id="t4b",
+            conn_type="kafka",
+            extra=json.dumps(
+                {
+                    "bootstrap.servers": "broker:29092",
+                    "group.id": "t4b",
+                    "enable.auto.commit": False,
+                    "auto.offset.reset": "beginning",
+                }
+            ),
+        )
+    )
+
+    db.merge_conn(
+        Connection(
+            conn_id="t5",
+            conn_type="kafka",
+            extra=json.dumps(
+                {
+                    "bootstrap.servers": "broker:29092",
+                    "group.id": "t5",
+                    "enable.auto.commit": False,
+                    "auto.offset.reset": "beginning",
+                }
+            ),
+        )
+    )
+
+
 def producer_function():
     for i in range(20):
         yield (json.dumps(i), json.dumps(i + 1))
@@ -155,13 +156,14 @@ with DAG(
     catchup=False,
     tags=["example"],
 ) as dag:
+    t0 = PythonOperator(task_id="load_connections", 
python_callable=load_connections)
 
     # [START howto_operator_produce_to_topic]
     t1 = ProduceToTopicOperator(
         kafka_config_id="t1-3",
         task_id="produce_to_topic",
         topic="test_1",
-        producer_function="hello_kafka.producer_function",
+        producer_function="example_dag_hello_kafka.producer_function",
     )
     # [END howto_operator_produce_to_topic]
 
@@ -173,7 +175,7 @@ with DAG(
         kafka_config_id="t2",
         task_id="consume_from_topic",
         topics=["test_1"],
-        apply_function="hello_kafka.consumer_function",
+        apply_function="example_dag_hello_kafka.consumer_function",
         apply_function_kwargs={"prefix": "consumed:::"},
         commit_cadence="end_of_batch",
         max_messages=10,
@@ -222,7 +224,7 @@ with DAG(
         kafka_config_id="t5",
         task_id="awaiting_message",
         topics=["test_1"],
-        apply_function="hello_kafka.await_function",
+        apply_function="example_dag_hello_kafka.await_function",
         xcom_push_key="retrieved_message",
     )
     # [END howto_sensor_await_message]
@@ -234,8 +236,8 @@ with DAG(
 
     t6.doc_md = "The task that is executed after the deferable task returns 
for execution."
 
-    t1 >> t2
-    t3 >> [t4, t4b] >> t5 >> t6
+    t0 >> t1 >> t2
+    t0 >> t3 >> [t4, t4b] >> t5 >> t6
 
 
 from tests.system.utils import get_test_run  # noqa: E402

Reply via email to