This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 404666ded0 Remove bad advice from Kafka system tests (#34470)
404666ded0 is described below
commit 404666ded04d60de050c0984d113b594aee50c71
Author: Andrey Anshin <[email protected]>
AuthorDate: Mon Sep 25 22:05:27 2023 +0400
Remove bad advice from Kafka system tests (#34470)
---
.../apache/kafka/example_dag_event_listener.py | 29 +++-------------------
1 file changed, 3 insertions(+), 26 deletions(-)
diff --git a/tests/system/providers/apache/kafka/example_dag_event_listener.py
b/tests/system/providers/apache/kafka/example_dag_event_listener.py
index e42f28eb72..6e5ea11824 100644
--- a/tests/system/providers/apache/kafka/example_dag_event_listener.py
+++ b/tests/system/providers/apache/kafka/example_dag_event_listener.py
@@ -19,8 +19,6 @@
from __future__ import annotations
import json
-import random
-import string
from pendulum import datetime
@@ -32,7 +30,6 @@ from airflow.models import Connection
# This is just for setting up connections in the demo - you should use standard
# methods for setting these connections in production
from airflow.operators.python import PythonOperator
-from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.apache.kafka.operators.produce import
ProduceToTopicOperator
from airflow.providers.apache.kafka.sensors.kafka import
AwaitMessageTriggerFunctionSensor
from airflow.utils import db
@@ -68,10 +65,6 @@ def _producer_function():
yield (json.dumps(i), json.dumps(i + 1))
-def _generate_uuid():
- return "".join(random.choices(string.ascii_lowercase, k=6))
-
-
with DAG(
dag_id="fizzbuzz-load-topic",
description="Load Data to fizz_buzz topic",
@@ -105,12 +98,9 @@ with DAG(
if val % 5 == 0:
return val
- def pick_downstream_dag(message, **context):
+ def wait_for_event(message, **context):
if message % 15 == 0:
- print(f"encountered {message} - executing external dag!")
- TriggerDagRunOperator(trigger_dag_id="fizz-buzz",
task_id=f"{message}{_generate_uuid()}").execute(
- context
- )
+ return f"encountered {message}!"
else:
if message % 3 == 0:
print(f"encountered {message} FIZZ !")
@@ -123,25 +113,12 @@ with DAG(
task_id="listen_for_message",
topics=["fizz_buzz"],
apply_function="example_dag_event_listener.await_function",
- event_triggered_function=pick_downstream_dag,
+ event_triggered_function=wait_for_event,
)
# [END howto_sensor_await_message_trigger_function]
t0 >> t1
-with DAG(
- dag_id="fizz-buzz",
- description="Triggered when mod 15 is 0.",
- start_date=datetime(2022, 11, 1),
- catchup=False,
- tags=["fizz-buzz"],
-):
-
- def _fizz_buzz():
- print("FIZZ BUZZ")
-
- fizz_buzz_task = PythonOperator(task_id="fizz_buzz",
python_callable=_fizz_buzz)
-
from tests.system.utils import get_test_run # noqa: E402