This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 404666ded0 Remove bad advice from Kafka system tests (#34470)
404666ded0 is described below

commit 404666ded04d60de050c0984d113b594aee50c71
Author: Andrey Anshin <[email protected]>
AuthorDate: Mon Sep 25 22:05:27 2023 +0400

    Remove bad advice from Kafka system tests (#34470)
---
 .../apache/kafka/example_dag_event_listener.py     | 29 +++-------------------
 1 file changed, 3 insertions(+), 26 deletions(-)

diff --git a/tests/system/providers/apache/kafka/example_dag_event_listener.py 
b/tests/system/providers/apache/kafka/example_dag_event_listener.py
index e42f28eb72..6e5ea11824 100644
--- a/tests/system/providers/apache/kafka/example_dag_event_listener.py
+++ b/tests/system/providers/apache/kafka/example_dag_event_listener.py
@@ -19,8 +19,6 @@
 from __future__ import annotations
 
 import json
-import random
-import string
 
 from pendulum import datetime
 
@@ -32,7 +30,6 @@ from airflow.models import Connection
 # This is just for setting up connections in the demo - you should use standard
 # methods for setting these connections in production
 from airflow.operators.python import PythonOperator
-from airflow.operators.trigger_dagrun import TriggerDagRunOperator
 from airflow.providers.apache.kafka.operators.produce import 
ProduceToTopicOperator
 from airflow.providers.apache.kafka.sensors.kafka import 
AwaitMessageTriggerFunctionSensor
 from airflow.utils import db
@@ -68,10 +65,6 @@ def _producer_function():
         yield (json.dumps(i), json.dumps(i + 1))
 
 
-def _generate_uuid():
-    return "".join(random.choices(string.ascii_lowercase, k=6))
-
-
 with DAG(
     dag_id="fizzbuzz-load-topic",
     description="Load Data to fizz_buzz topic",
@@ -105,12 +98,9 @@ with DAG(
         if val % 5 == 0:
             return val
 
-    def pick_downstream_dag(message, **context):
+    def wait_for_event(message, **context):
         if message % 15 == 0:
-            print(f"encountered {message} - executing external dag!")
-            TriggerDagRunOperator(trigger_dag_id="fizz-buzz", 
task_id=f"{message}{_generate_uuid()}").execute(
-                context
-            )
+            return f"encountered {message}!"
         else:
             if message % 3 == 0:
                 print(f"encountered {message} FIZZ !")
@@ -123,25 +113,12 @@ with DAG(
         task_id="listen_for_message",
         topics=["fizz_buzz"],
         apply_function="example_dag_event_listener.await_function",
-        event_triggered_function=pick_downstream_dag,
+        event_triggered_function=wait_for_event,
     )
     # [END howto_sensor_await_message_trigger_function]
 
     t0 >> t1
 
-with DAG(
-    dag_id="fizz-buzz",
-    description="Triggered when mod 15 is 0.",
-    start_date=datetime(2022, 11, 1),
-    catchup=False,
-    tags=["fizz-buzz"],
-):
-
-    def _fizz_buzz():
-        print("FIZZ BUZZ")
-
-    fizz_buzz_task = PythonOperator(task_id="fizz_buzz", 
python_callable=_fizz_buzz)
-
 
 from tests.system.utils import get_test_run  # noqa: E402
 

Reply via email to