Copilot commented on code in PR #64833:
URL: https://github.com/apache/airflow/pull/64833#discussion_r3066503655
##########
dev/breeze/src/airflow_breeze/utils/selective_checks.py:
##########
@@ -198,6 +199,13 @@ def __hash__(self):
r"^airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/.*",
r"^providers/elasticsearch/.*",
],
+ FileGroupForCi.EVENT_DRIVEN_E2E_FILES: [
+
r"^airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/.*",
+
r"^airflow-e2e-tests/tests/airflow_e2e_tests/dags/example_event_driven\.py$",
+ r"^airflow-e2e-tests/docker/kafka/.*",
Review Comment:
The selective-checks file patterns include
`airflow-e2e-tests/docker/kafka/.*` but do not include the compose file
`airflow-e2e-tests/docker/kafka.yml`. Changes to `kafka.yml` would not trigger
the event-driven E2E suite. Add a pattern for
`^airflow-e2e-tests/docker/kafka\.yml$` (or a broader pattern covering both the
directory and the top-level compose file).
```suggestion
r"^airflow-e2e-tests/docker/kafka/.*",
r"^airflow-e2e-tests/docker/kafka\.yml$",
```
##########
airflow-e2e-tests/tests/airflow_e2e_tests/dags/example_event_driven.py:
##########
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+from typing import TYPE_CHECKING
+
+import pendulum
+
+from airflow.providers.apache.kafka.operators.produce import
ProduceToTopicOperator
+from airflow.providers.common.messaging.triggers.msg_queue import
MessageQueueTrigger
+from airflow.sdk import Asset, AssetWatcher, dag, get_current_context, task
+
+if TYPE_CHECKING:
+ from airflow.sdk.execution_time.context import Context,
TriggeringAssetEventsAccessor
+
+KAFKA_CONFIG_ID = "kafka_default"
+TOPICS = ["fizz_buzz"]
+DLQ_TOPIC = "dlq"
+RETRY_COUNT = 3
+"""Airflow Kafka connection
+AIRFLOW_CONN_KAFKA_DEFAULT='{
+ "conn_type": "general",
+ "extra": {
+ "bootstrap.servers": "broker:29092",
+ "group.id": "kafka_default_group",
+ "security.protocol": "PLAINTEXT",
+ "enable.auto.commit": false,
+ "auto.offset.reset": "latest"
+ }
+}'
+"""
+"""Kafka Command to verify messages are being produced to the topic:
+
+# Create Topic
+/bin/kafka-topics --bootstrap-server broker:29092 --create --topic fizz_buzz
+/bin/kafka-topics --bootstrap-server broker:29092 --create --topic dlq
+
+
+# Get offsets for the topic to verify messages are being produced
+/bin/kafka-get-offsets --bootstrap-server broker:29092 --topic fizz_buzz
+/bin/kafka-get-offsets --bootstrap-server broker:29092 --topic dlq
+
+# List consumer groups to verify our consumer group is being registered
+/bin/kafka-consumer-groups --bootstrap-server broker:29092 --list
+
+# Get current offsets for the consumer group to verify messages are being
consumed
+/bin/kafka-consumer-groups --bootstrap-server broker:29092 --describe --group
group_1
+"""
+
+SAMPLE_ORDERS = [
+ {"order_id": "ORD-1001", "customer": "alice", "item": "widget",
"quantity": 2, "price": 9.99},
+ {"order_id": "ORD-1002", "customer": "bob", "item": "gadget", "quantity":
1, "price": 24.50},
+ {"order_id": "ORD-1003", "customer": "carol", "item": "widget",
"quantity": 5, "price": 9.99},
+ {"order_id": "ORD-1004", "customer": "dave", "item": "doohickey",
"quantity": 3, "price": 14.75},
+ {"order_id": "ORD-1005", "customer": "eve", "item": "thingamajig",
"quantity": 1, "price": 39.00},
+ {"order_id": "ORD-1006", "customer": "frank", "item": "widget",
"quantity": 10, "price": 9.99},
+ {"order_id": "ORD-1007", "customer": "grace", "item": "gadget",
"quantity": 2, "price": 24.50},
+ {"order_id": "ORD-1008", "customer": "heidi", "item": "doohickey",
"quantity": 1, "price": 14.75},
+]
+
+
+def producer_function():
+ for order in SAMPLE_ORDERS:
+ yield (json.dumps(order["order_id"]), json.dumps(order))
+ # produce a malformed message to demonstrate error handling
+ yield ("malformed_message", "malformed_message")
+
+
+def process_one_message(message: str):
+ order = json.loads(message)
+ total = order["quantity"] * order["price"]
+ print(f"Order {order['order_id']}: {order['quantity']}x {order['item']} =
${total:.2f}")
+ return order
+
+
+def handle_dlq():
+ context: Context = get_current_context()
+ triggering_asset_events: TriggeringAssetEventsAccessor =
context["triggering_asset_events"]
+ for event in triggering_asset_events[kafka_cdc_asset]:
+ print(f"Handling failed message from event: {event}")
+ value = json.dumps(
+ {
+ "asset": event.asset.model_dump(mode="json"),
+ "extra": event.extra,
+ }
+ )
+ yield (json.dumps(event.asset.uri), value)
+
+
+# Airflow 3 example
+# Define a trigger that listens to an external message queue (Apache Kafka in
this case)
+trigger = MessageQueueTrigger(
+ scheme="kafka",
+ # the rest of the parameters are used by the trigger
+ kafka_config_id=KAFKA_CONFIG_ID,
+ topics=TOPICS,
+ poll_interval=1,
+ poll_timeout=1,
+ commit_offset=True,
+)
+
+# Define an asset that watches for messages on the queue
+kafka_cdc_asset = Asset("kafka_cdc_asset",
watchers=[AssetWatcher(name="kafka_cdc", trigger=trigger)])
+
+
+@dag(
+ schedule=[kafka_cdc_asset],
+ tags=["event-driven"],
+)
+def event_driven_consumer():
+
+ @task(retries=RETRY_COUNT, retry_delay=1)
Review Comment:
`retry_delay` is expected to be a `datetime.timedelta` (or equivalent
duration), not an `int`. Passing `1` is likely to raise a validation/type error
at DAG parse/runtime. Use a `timedelta(seconds=1)` (or pendulum duration)
instead.
##########
airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/test_event_driven.py:
##########
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""E2E tests for the event-driven DAG pattern using Apache Kafka.
+
+The producer DAG sends 8 valid orders + 1 malformed message to the
``fizz_buzz``
+Kafka topic. The consumer DAG is scheduled on an Asset with a Kafka
AssetWatcher,
+so each message triggers a separate consumer DAG run (9 total).
+"""
+
+from __future__ import annotations
+
+import time
+
+from airflow_e2e_tests.e2e_test_utils.clients import AirflowClient
+
+PRODUCER_DAG_ID = "event_driven_producer"
+CONSUMER_DAG_ID = "event_driven_consumer"
+# 8 valid orders + 1 malformed message
+EXPECTED_CONSUMER_RUNS = 9
+EXPECTED_FIZZ_BUZZ_OFFSET = 9
+EXPECTED_DLQ_OFFSET = 1
+
+
+def _parse_topic_offset(raw_output: str, topic: str) -> int:
+ """Parse ``kafka-get-offsets`` output (``topic:partition:offset``) and
return the offset."""
+ for line in raw_output.strip().splitlines():
+ parts = line.strip().split(":")
+ if len(parts) == 3 and parts[0] == topic:
+ return int(parts[2])
+ raise ValueError(f"Could not find offset for topic '{topic}' in
output:\n{raw_output}")
+
+
+class TestEventDrivenDag:
+ """Test the Kafka-based event-driven producer/consumer DAG pair."""
+
+ airflow_client = AirflowClient()
+
+ # ------------------------------------------------------------------
+ # Helpers
+ # ------------------------------------------------------------------
+
+ def _wait_for_consumer_dag_runs(
+ self, expected_count: int, timeout: int = 600, check_interval: int = 10
+ ) -> list[dict]:
+ """Poll until *expected_count* consumer DAG runs reach a terminal
state."""
+ start = time.monotonic()
+ while time.monotonic() - start < timeout:
+ response = self.airflow_client.list_dag_runs(CONSUMER_DAG_ID)
+ runs = response.get("dag_runs", [])
+ terminal_runs = [r for r in runs if r["state"] in ("success",
"failed")]
+ if len(terminal_runs) >= expected_count:
+ return terminal_runs
Review Comment:
`_wait_for_consumer_dag_runs` returns as soon as terminal runs are `>=
expected_count`, but returns *all* terminal runs. If there are more than
`EXPECTED_CONSUMER_RUNS` (e.g., retries creating additional runs in some
setups, or any pre-existing runs), the subsequent equality assertion will fail.
Consider returning exactly `expected_count` runs (e.g., filter/sort to the most
recent ones) or change the assertion to accept `>=` after filtering to runs
created by this test.
```suggestion
terminal_runs = sorted(
terminal_runs,
key=lambda r: (
r.get("end_date") or "",
r.get("start_date") or "",
r.get("logical_date") or "",
r.get("dag_run_id") or "",
),
reverse=True,
)
return terminal_runs[:expected_count]
```
##########
airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py:
##########
@@ -95,6 +96,59 @@ def _setup_elasticsearch_integration(dot_env_file, tmp_dir):
os.environ["ENV_FILE_PATH"] = str(dot_env_file)
+def _copy_kafka_files(tmp_dir):
+ """Copy Kafka compose file and init script into the temp directory."""
+ copyfile(KAFKA_DIR_PATH.parent / "kafka.yml", tmp_dir / "kafka.yml")
+
+ kafka_dir = tmp_dir / "kafka"
+ kafka_dir.mkdir()
+ copyfile(KAFKA_DIR_PATH / "update_run.sh", kafka_dir / "update_run.sh")
+ current_permissions = os.stat(kafka_dir / "update_run.sh").st_mode
+ os.chmod(kafka_dir / "update_run.sh", current_permissions | 0o111)
+
+
+def _setup_event_driven_integration(dot_env_file, tmp_dir):
+ _copy_kafka_files(tmp_dir)
+
+ kafka_conn = json.dumps(
+ {
+ "conn_type": "general",
+ "extra": {
+ "bootstrap.servers": "broker:29092",
+ "group.id": "kafka_default_group",
+ "security.protocol": "PLAINTEXT",
+ "enable.auto.commit": False,
+ "auto.offset.reset": "latest",
+ },
+ }
+ )
+
+ dot_env_file.write_text(
+ f"AIRFLOW_UID={os.getuid()}\n"
+ f"AIRFLOW_CONN_KAFKA_DEFAULT='{kafka_conn}'\n"
+ "_PIP_ADDITIONAL_REQUIREMENTS="
+ "apache-airflow-providers-apache-kafka
apache-airflow-providers-common-messaging\n"
+ )
+ os.environ["ENV_FILE_PATH"] = str(dot_env_file)
+
+
+def _create_kafka_topics(compose_instance):
+ """Create Kafka topics required by the event-driven DAG."""
+ for topic in ("fizz_buzz", "dlq"):
+ compose_instance.exec_in_container(
+ command=[
+ "kafka-topics",
+ "--bootstrap-server",
+ "broker:29092",
+ "--create",
+ "--topic",
+ topic,
+ "--if-not-exists",
+ ],
+ service_name="broker",
+ )
Review Comment:
Topic creation doesn’t pin partitions/replication-factor, which can vary by
broker defaults and can break the offset assertions/parsing (especially if
partitions > 1). For deterministic E2E behavior, create topics with explicit
`--partitions 1` and `--replication-factor 1` (or adapt offset
parsing/assertions to handle multiple partitions).
##########
airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/test_event_driven.py:
##########
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""E2E tests for the event-driven DAG pattern using Apache Kafka.
+
+The producer DAG sends 8 valid orders + 1 malformed message to the
``fizz_buzz``
+Kafka topic. The consumer DAG is scheduled on an Asset with a Kafka
AssetWatcher,
+so each message triggers a separate consumer DAG run (9 total).
+"""
+
+from __future__ import annotations
+
+import time
+
+from airflow_e2e_tests.e2e_test_utils.clients import AirflowClient
+
+PRODUCER_DAG_ID = "event_driven_producer"
+CONSUMER_DAG_ID = "event_driven_consumer"
+# 8 valid orders + 1 malformed message
+EXPECTED_CONSUMER_RUNS = 9
+EXPECTED_FIZZ_BUZZ_OFFSET = 9
+EXPECTED_DLQ_OFFSET = 1
+
+
+def _parse_topic_offset(raw_output: str, topic: str) -> int:
+ """Parse ``kafka-get-offsets`` output (``topic:partition:offset``) and
return the offset."""
+ for line in raw_output.strip().splitlines():
+ parts = line.strip().split(":")
+ if len(parts) == 3 and parts[0] == topic:
+ return int(parts[2])
+ raise ValueError(f"Could not find offset for topic '{topic}' in
output:\n{raw_output}")
+
+
+class TestEventDrivenDag:
+ """Test the Kafka-based event-driven producer/consumer DAG pair."""
+
+ airflow_client = AirflowClient()
+
+ # ------------------------------------------------------------------
+ # Helpers
+ # ------------------------------------------------------------------
+
+ def _wait_for_consumer_dag_runs(
+ self, expected_count: int, timeout: int = 600, check_interval: int = 10
+ ) -> list[dict]:
+ """Poll until *expected_count* consumer DAG runs reach a terminal
state."""
+ start = time.monotonic()
+ while time.monotonic() - start < timeout:
+ response = self.airflow_client.list_dag_runs(CONSUMER_DAG_ID)
+ runs = response.get("dag_runs", [])
+ terminal_runs = [r for r in runs if r["state"] in ("success",
"failed")]
+ if len(terminal_runs) >= expected_count:
+ return terminal_runs
+ time.sleep(check_interval)
+
+ # Timed out — gather diagnostics
+ response = self.airflow_client.list_dag_runs(CONSUMER_DAG_ID)
+ runs = response.get("dag_runs", [])
+ states = {r["dag_run_id"]: r["state"] for r in runs}
+ raise TimeoutError(
+ f"Expected {expected_count} terminal consumer DAG runs within
{timeout}s, "
+ f"but only found {len([r for r in runs if r['state'] in
('success', 'failed')])}. "
+ f"Run states: {states}"
+ )
+
+ def _get_topic_offset(self, compose_instance, topic: str) -> int:
+ """Return the current end-offset of *topic* via ``kafka-get-offsets``
inside the broker."""
+ stdout, _ = compose_instance.exec_in_container(
+ command=[
+ "kafka-get-offsets",
+ "--bootstrap-server",
+ "broker:29092",
+ "--topic",
+ topic,
+ ],
+ service_name="broker",
+ )
+ output = stdout.decode() if isinstance(stdout, bytes) else stdout
+ return _parse_topic_offset(output, topic)
+
+ # ------------------------------------------------------------------
+ # Test
+ # ------------------------------------------------------------------
+
+ def test_producer_triggers_consumer_and_kafka_offsets(self,
compose_instance):
+ """Trigger the producer once and verify 9 consumer runs and Kafka
offsets.
+
+ Steps:
+ 1. Unpause the consumer DAG so the triggerer starts the AssetWatcher.
+ 2. Wait for the Kafka MessageQueueTrigger to begin polling.
+ 3. Trigger the producer DAG and wait for it to succeed.
+ 4. Wait for 9 consumer DAG runs to reach a terminal state.
+ 5. Verify that the ``fizz_buzz`` topic has offset 9 (all messages
produced).
+ 6. Verify that the ``dlq`` topic has offset 1 (the malformed message).
+ """
+ # 1. Unpause consumer so the triggerer registers the AssetWatcher
+ self.airflow_client.un_pause_dag(CONSUMER_DAG_ID)
+
+ # 2. Give the triggerer time to start the MessageQueueTrigger and
subscribe.
+ # The trigger uses poll_interval=1 and auto.offset.reset=latest so
it
+ # must be actively polling before the producer writes.
+ time.sleep(30)
Review Comment:
A fixed 30s sleep adds a hard minimum runtime to the suite and is a common
source of CI slowness. Prefer polling for readiness (e.g., wait until the
consumer group is registered in Kafka, or until Airflow reports the
watcher/trigger is active) with a timeout; this keeps the test fast when the
system is ready quickly while still being robust.
##########
airflow-e2e-tests/docker/kafka.yml:
##########
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+services:
+ broker:
+ image: confluentinc/cp-kafka:7.3.0
+ labels:
+ breeze.description: "Integration required for Kafka hooks."
+ hostname: broker
+ container_name: broker
+ ports:
+ - "9092:9092"
+ - "9101:9101"
Review Comment:
Setting a fixed `container_name` can cause conflicts if multiple compose
projects run on the same Docker host (common in local dev or parallel test
scenarios). Also, exposing host ports is often unnecessary for intra-compose
communication (Airflow uses `broker:29092`) and can create port-collision
failures. Consider removing `container_name` and only publishing ports if there
is a demonstrated need.
```suggestion
ports:
- "9092:9092"
```
##########
airflow-e2e-tests/tests/airflow_e2e_tests/event_driven_tests/test_event_driven.py:
##########
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""E2E tests for the event-driven DAG pattern using Apache Kafka.
+
+The producer DAG sends 8 valid orders + 1 malformed message to the
``fizz_buzz``
+Kafka topic. The consumer DAG is scheduled on an Asset with a Kafka
AssetWatcher,
+so each message triggers a separate consumer DAG run (9 total).
+"""
+
+from __future__ import annotations
+
+import time
+
+from airflow_e2e_tests.e2e_test_utils.clients import AirflowClient
+
+PRODUCER_DAG_ID = "event_driven_producer"
+CONSUMER_DAG_ID = "event_driven_consumer"
+# 8 valid orders + 1 malformed message
+EXPECTED_CONSUMER_RUNS = 9
+EXPECTED_FIZZ_BUZZ_OFFSET = 9
+EXPECTED_DLQ_OFFSET = 1
+
+
+def _parse_topic_offset(raw_output: str, topic: str) -> int:
+ """Parse ``kafka-get-offsets`` output (``topic:partition:offset``) and
return the offset."""
+ for line in raw_output.strip().splitlines():
+ parts = line.strip().split(":")
+ if len(parts) == 3 and parts[0] == topic:
+ return int(parts[2])
+ raise ValueError(f"Could not find offset for topic '{topic}' in
output:\n{raw_output}")
+
+
+class TestEventDrivenDag:
+ """Test the Kafka-based event-driven producer/consumer DAG pair."""
+
+ airflow_client = AirflowClient()
+
+ # ------------------------------------------------------------------
+ # Helpers
+ # ------------------------------------------------------------------
+
+ def _wait_for_consumer_dag_runs(
+ self, expected_count: int, timeout: int = 600, check_interval: int = 10
+ ) -> list[dict]:
+ """Poll until *expected_count* consumer DAG runs reach a terminal
state."""
+ start = time.monotonic()
+ while time.monotonic() - start < timeout:
+ response = self.airflow_client.list_dag_runs(CONSUMER_DAG_ID)
+ runs = response.get("dag_runs", [])
+ terminal_runs = [r for r in runs if r["state"] in ("success",
"failed")]
+ if len(terminal_runs) >= expected_count:
+ return terminal_runs
+ time.sleep(check_interval)
+
+ # Timed out — gather diagnostics
+ response = self.airflow_client.list_dag_runs(CONSUMER_DAG_ID)
+ runs = response.get("dag_runs", [])
+ states = {r["dag_run_id"]: r["state"] for r in runs}
+ raise TimeoutError(
+ f"Expected {expected_count} terminal consumer DAG runs within
{timeout}s, "
+ f"but only found {len([r for r in runs if r['state'] in
('success', 'failed')])}. "
+ f"Run states: {states}"
+ )
+
+ def _get_topic_offset(self, compose_instance, topic: str) -> int:
+ """Return the current end-offset of *topic* via ``kafka-get-offsets``
inside the broker."""
+ stdout, _ = compose_instance.exec_in_container(
+ command=[
+ "kafka-get-offsets",
+ "--bootstrap-server",
+ "broker:29092",
+ "--topic",
+ topic,
+ ],
+ service_name="broker",
+ )
+ output = stdout.decode() if isinstance(stdout, bytes) else stdout
+ return _parse_topic_offset(output, topic)
+
+ # ------------------------------------------------------------------
+ # Test
+ # ------------------------------------------------------------------
+
+ def test_producer_triggers_consumer_and_kafka_offsets(self,
compose_instance):
+ """Trigger the producer once and verify 9 consumer runs and Kafka
offsets.
+
+ Steps:
+ 1. Unpause the consumer DAG so the triggerer starts the AssetWatcher.
+ 2. Wait for the Kafka MessageQueueTrigger to begin polling.
+ 3. Trigger the producer DAG and wait for it to succeed.
+ 4. Wait for 9 consumer DAG runs to reach a terminal state.
+ 5. Verify that the ``fizz_buzz`` topic has offset 9 (all messages
produced).
+ 6. Verify that the ``dlq`` topic has offset 1 (the malformed message).
+ """
+ # 1. Unpause consumer so the triggerer registers the AssetWatcher
+ self.airflow_client.un_pause_dag(CONSUMER_DAG_ID)
+
+ # 2. Give the triggerer time to start the MessageQueueTrigger and
subscribe.
+ # The trigger uses poll_interval=1 and auto.offset.reset=latest so
it
+ # must be actively polling before the producer writes.
+ time.sleep(30)
+
+ # 3. Trigger producer and wait for it to complete
+ producer_state =
self.airflow_client.trigger_dag_and_wait(PRODUCER_DAG_ID)
+ assert producer_state == "success", f"Producer DAG did not succeed.
Final state: {producer_state}"
+
+ # 4. Wait for all 9 consumer DAG runs
+ consumer_runs =
self._wait_for_consumer_dag_runs(expected_count=EXPECTED_CONSUMER_RUNS)
+ assert len(consumer_runs) == EXPECTED_CONSUMER_RUNS, (
+ f"Expected {EXPECTED_CONSUMER_RUNS} consumer runs, got
{len(consumer_runs)}"
+ )
Review Comment:
`_wait_for_consumer_dag_runs` returns as soon as terminal runs are `>=
expected_count`, but returns *all* terminal runs. If there are more than
`EXPECTED_CONSUMER_RUNS` (e.g., retries creating additional runs in some
setups, or any pre-existing runs), the subsequent equality assertion will fail.
Consider returning exactly `expected_count` runs (e.g., filter/sort to the most
recent ones) or change the assertion to accept `>=` after filtering to runs
created by this test.
```suggestion
assert len(consumer_runs) >= EXPECTED_CONSUMER_RUNS, (
f"Expected at least {EXPECTED_CONSUMER_RUNS} consumer runs, got
{len(consumer_runs)}"
)
consumer_runs = sorted(
consumer_runs,
key=lambda r: (
r.get("end_date") or "",
r.get("start_date") or "",
r.get("logical_date") or "",
r.get("dag_run_id") or "",
),
)[-EXPECTED_CONSUMER_RUNS:]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]