This is an automated email from the ASF dual-hosted git repository.
joshfell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c048bd5d9e Fix test_example_dags (#32714)
c048bd5d9e is described below
commit c048bd5d9e98b05a8c2b68af0ef1853a700888f7
Author: Augusto Hidalgo <[email protected]>
AuthorDate: Fri Aug 4 12:24:16 2023 -0400
Fix test_example_dags (#32714)
By going up to `parents[3]` we were going outside the repository root,
luckily(or unluckily the repo folder is also named `airflow` so the
pattern `airflow/**/example_dags/example_*.py` still worked,
but `tests/system/providers/**/example_*.py` wasn't being used.
This discovered 2 new errors:
- `example_local_to_wasb.py` was trivial to fix
- `example_redis_publish.py`is more interesting: this one fails because
`RedisPubSubSensor` constructor calls Redis.pubsub().subscribe(), which
just hangs and DagBag fails with timeout. For now I'm just deleting this
operator from the example.
---
tests/always/test_example_dags.py | 8 +++++---
tests/system/providers/redis/example_redis_publish.py | 14 +-------------
2 files changed, 6 insertions(+), 16 deletions(-)
diff --git a/tests/always/test_example_dags.py
b/tests/always/test_example_dags.py
index 02533b21cf..2666a5b93a 100644
--- a/tests/always/test_example_dags.py
+++ b/tests/always/test_example_dags.py
@@ -26,7 +26,7 @@ from airflow.models import DagBag
from airflow.utils import yaml
from tests.test_utils.asserts import assert_queries_count
-AIRFLOW_SOURCES_ROOT = Path(__file__).resolve().parents[3]
+AIRFLOW_SOURCES_ROOT = Path(__file__).resolve().parents[2]
AIRFLOW_PROVIDERS_ROOT = AIRFLOW_SOURCES_ROOT / "airflow" / "providers"
NO_DB_QUERY_EXCEPTION = ["/airflow/example_dags/example_subdag_operator.py"]
@@ -54,7 +54,9 @@ def example_not_suspended_dags():
suspended_providers_folders = get_suspended_providers_folders()
possible_prefixes = ["airflow/providers/", "tests/system/providers/"]
suspended_providers_folders = [
- f"{prefix}{provider}" for prefix in possible_prefixes for provider in
suspended_providers_folders
+ AIRFLOW_SOURCES_ROOT.joinpath(prefix, provider).as_posix()
+ for prefix in possible_prefixes
+ for provider in suspended_providers_folders
]
for example_dir in example_dirs:
candidates = glob(f"{AIRFLOW_SOURCES_ROOT.as_posix()}/{example_dir}",
recursive=True)
@@ -68,7 +70,7 @@ def example_dags_except_db_exception():
return [
dag_file
for dag_file in example_not_suspended_dags()
- if any(not dag_file.endswith(e) for e in NO_DB_QUERY_EXCEPTION)
+ if not any(dag_file.endswith(e) for e in NO_DB_QUERY_EXCEPTION)
]
diff --git a/tests/system/providers/redis/example_redis_publish.py
b/tests/system/providers/redis/example_redis_publish.py
index e524a899af..4216b862d4 100644
--- a/tests/system/providers/redis/example_redis_publish.py
+++ b/tests/system/providers/redis/example_redis_publish.py
@@ -33,7 +33,6 @@ from datetime import datetime
from airflow import DAG
from airflow.providers.redis.operators.redis_publish import
RedisPublishOperator
from airflow.providers.redis.sensors.redis_key import RedisKeySensor
-from airflow.providers.redis.sensors.redis_pub_sub import RedisPubSubSensor
# [END import_module]
# [START instantiate_dag]
@@ -59,17 +58,6 @@ with DAG(
# [END RedisPublishOperator_DAG]
- # [START RedisPubSubSensor_DAG]
- pubsub_sensor_task = RedisPubSubSensor(
- task_id="pubsub_sensor_task",
- redis_conn_id="redis_default",
- channels="your_channel",
- dag=dag,
- timeout=600,
- poke_interval=30,
- )
- # [END RedisPubSubSensor_DAG]
-
# [START RedisKeySensor_DAG]
key_sensor_task = RedisKeySensor(
task_id="key_sensor_task",
@@ -81,7 +69,7 @@ with DAG(
)
# [END RedisKeySensor_DAG]
- publish_task >> pubsub_sensor_task >> key_sensor_task
+ publish_task >> key_sensor_task
from tests.system.utils.watcher import watcher