This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new d5620a47b56 [v3-2-test] Aggregate CI-image dependency groups so
providers can register non-default extras with a one-line change (#67130)
(#67139)
d5620a47b56 is described below
commit d5620a47b56095fdddaa42c16190c43264dd8273
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 2 19:47:11 2026 +0200
[v3-2-test] Aggregate CI-image dependency groups so providers can register
non-default extras with a one-line change (#67130) (#67139)
* Fix OTel timer metrics using Gauge instead of Histogram (#64207) (#66865)
* Fix OTel timer metrics using Gauge instead of Histogram
* Use ExponentialBucketHistogramAggregation for timing metrics
* Use public API import path for ExponentialBucketHistogramAggregation and
fix histogram map isolation
(cherry picked from commit b2dadd2b7623d0d99f6fea0521bf008b7b957cac)
Co-authored-by: namratachaudhary <[email protected]>
* Add name fields to SDK deadline alerts (#64926) (#65601)
* Add deadlines support with name and description fields in alerts and UI
* Add 'viewAll' label to deadlineStatus in dag.json
* Refactor deadlineAlerts referenceType structure
* Refine deadline alert translations in dag.json
Updated deadline alert messages for clarity and consistency.
* Add completion rule text for deadline alerts in UI
* Remove duplicate completionRule entry in deadlineAlerts
* Remove alert description from DeadlineAlert and related models
* Enhance deadline handling: return name updates alongside UUID mapping in
SerializedDagModel
* Add alert_id field to DeadlineResponse and update tests for alert handling
* Remove DEADLINES option from MenuItem enum
* Update airflow-core/src/airflow/serialization/encoders.py
---------
(cherry picked from commit e9d106667c7fe98f77c1312bb082c010b472c388)
Co-authored-by: Richard Wu <[email protected]>
Co-authored-by: D. Ferruzzi <[email protected]>
* [v3-2-test] Allow accessing a TaskGroup's members via `[]` (#64430)
(#65707)
(cherry picked from commit aa54031f3cece54b1492bb13648bd521d5bf3082)
Co-authored-by: Dev-iL <[email protected]>
* [v3-2-test] Skip test_schedule_tis_start_trigger pending #55068 backport
decision (#66315)
The `start_from_trigger` deferred-at-schedule path is commented out
on v3-2-test (disabled in 91e10295c7d as a TODO) and was only
re-enabled on main by #55068, which landed one day after the v3-2
branch was cut. The test asserts the feature works and fails on
v3-2-test CI.
Skip the test on the 3.2 line and link the tracking issue from both
the test skip-reason and the disabled production-code site so the
follow-up isn't lost. Decision (backport #55068 or formally drop
the feature on 3.2) is tracked in #66307.
* [v3-2-test] Aggregate CI-image dependency groups so providers can
register non-default extras with a one-line change (#67130)
(cherry picked from commit e61640e2a1bceb3ee864d2dd552f623ac76bfb2d)
Co-authored-by: Bugra Ozturk <[email protected]>
---------
Co-authored-by: Rahul Vats <[email protected]>
Co-authored-by: namratachaudhary <[email protected]>
Co-authored-by: Pierre Jeambrun <[email protected]>
Co-authored-by: Richard Wu <[email protected]>
Co-authored-by: D. Ferruzzi <[email protected]>
Co-authored-by: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Dev-iL <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
Co-authored-by: Bugra Ozturk <[email protected]>
---
Dockerfile | 12 ++---
Dockerfile.ci | 12 ++---
airflow-core/newsfragments/64207.significant.rst | 1 +
.../api_fastapi/core_api/datamodels/ui/deadline.py | 4 +-
.../api_fastapi/core_api/openapi/_private_ui.yaml | 9 ++--
airflow-core/src/airflow/models/serialized_dag.py | 39 +++++++++++---
airflow-core/src/airflow/serialization/decoders.py | 1 +
.../airflow/serialization/definitions/deadline.py | 2 +
airflow-core/src/airflow/serialization/encoders.py | 1 +
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 11 ++--
.../airflow/ui/openapi-gen/requests/types.gen.ts | 2 +-
.../core_api/routes/ui/test_deadlines.py | 12 ++---
.../tests/unit/models/test_deadline_alert.py | 4 --
.../tests/unit/models/test_serialized_dag.py | 62 ++++++++++++++++++++++
contributing-docs/12_provider_distributions.rst | 43 +++++++++++++++
.../utils/constraints_version_check.py | 8 +--
pyproject.toml | 13 +++++
.../docker/install_airflow_when_building_images.sh | 12 ++---
.../observability/metrics/otel_logger.py | 58 +++++++++++++++++---
.../observability/metrics/test_otel_logger.py | 47 +++++++++++-----
task-sdk/src/airflow/sdk/definitions/deadline.py | 2 +
task-sdk/src/airflow/sdk/definitions/taskgroup.py | 4 ++
.../tests/task_sdk/definitions/test_taskgroup.py | 25 +++++++++
23 files changed, 311 insertions(+), 73 deletions(-)
diff --git a/Dockerfile b/Dockerfile
index 9551f4d4d0d..e8c59d469b5 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1228,8 +1228,8 @@ function install_from_sources() {
# (binary lxml embeds its own libxml2, while xmlsec uses system one).
# See https://bugs.launchpad.net/lxml/+bug/2110068
set -x
- uv sync --all-packages --resolution highest --group dev --group docs
--group docs-gen \
- --group leveldb ${extra_sync_flags} --no-binary-package lxml
--no-binary-package xmlsec \
+ uv sync --all-packages --resolution highest --group ci-image \
+ ${extra_sync_flags} --no-binary-package lxml --no-binary-package
xmlsec \
--no-python-downloads --no-managed-python
else
set +x
@@ -1241,8 +1241,8 @@ function install_from_sources() {
# libxml2 (binary lxml embeds its own libxml2, while xmlsec uses
system one).
# See https://bugs.launchpad.net/lxml/+bug/2110068
set -x
- if ! uv sync --all-packages --frozen --group dev --group docs --group
docs-gen \
- --group leveldb ${extra_sync_flags} --no-binary-package lxml
--no-binary-package xmlsec \
+ if ! uv sync --all-packages --frozen --group ci-image \
+ ${extra_sync_flags} --no-binary-package lxml --no-binary-package
xmlsec \
--no-python-downloads --no-managed-python; then
set +x
if [[ ${AIRFLOW_FALLBACK_NO_CONSTRAINTS_INSTALLATION} != "true"
]]; then
@@ -1257,8 +1257,8 @@ function install_from_sources() {
echo "${COLOR_BLUE}Falling back to re-resolving dependencies (uv
sync without --frozen).${COLOR_RESET}"
echo
set -x
- uv sync --all-packages --group dev --group docs --group docs-gen \
- --group leveldb ${extra_sync_flags} --no-binary-package lxml
--no-binary-package xmlsec \
+ uv sync --all-packages --group ci-image \
+ ${extra_sync_flags} --no-binary-package lxml
--no-binary-package xmlsec \
--no-python-downloads --no-managed-python
set +x
fi
diff --git a/Dockerfile.ci b/Dockerfile.ci
index 4c39038e021..dab1755cf62 100644
--- a/Dockerfile.ci
+++ b/Dockerfile.ci
@@ -933,8 +933,8 @@ function install_from_sources() {
# (binary lxml embeds its own libxml2, while xmlsec uses system one).
# See https://bugs.launchpad.net/lxml/+bug/2110068
set -x
- uv sync --all-packages --resolution highest --group dev --group docs
--group docs-gen \
- --group leveldb ${extra_sync_flags} --no-binary-package lxml
--no-binary-package xmlsec \
+ uv sync --all-packages --resolution highest --group ci-image \
+ ${extra_sync_flags} --no-binary-package lxml --no-binary-package
xmlsec \
--no-python-downloads --no-managed-python
else
set +x
@@ -946,8 +946,8 @@ function install_from_sources() {
# libxml2 (binary lxml embeds its own libxml2, while xmlsec uses
system one).
# See https://bugs.launchpad.net/lxml/+bug/2110068
set -x
- if ! uv sync --all-packages --frozen --group dev --group docs --group
docs-gen \
- --group leveldb ${extra_sync_flags} --no-binary-package lxml
--no-binary-package xmlsec \
+ if ! uv sync --all-packages --frozen --group ci-image \
+ ${extra_sync_flags} --no-binary-package lxml --no-binary-package
xmlsec \
--no-python-downloads --no-managed-python; then
set +x
if [[ ${AIRFLOW_FALLBACK_NO_CONSTRAINTS_INSTALLATION} != "true"
]]; then
@@ -962,8 +962,8 @@ function install_from_sources() {
echo "${COLOR_BLUE}Falling back to re-resolving dependencies (uv
sync without --frozen).${COLOR_RESET}"
echo
set -x
- uv sync --all-packages --group dev --group docs --group docs-gen \
- --group leveldb ${extra_sync_flags} --no-binary-package lxml
--no-binary-package xmlsec \
+ uv sync --all-packages --group ci-image \
+ ${extra_sync_flags} --no-binary-package lxml
--no-binary-package xmlsec \
--no-python-downloads --no-managed-python
set +x
fi
diff --git a/airflow-core/newsfragments/64207.significant.rst
b/airflow-core/newsfragments/64207.significant.rst
new file mode 100644
index 00000000000..3254fa20a54
--- /dev/null
+++ b/airflow-core/newsfragments/64207.significant.rst
@@ -0,0 +1 @@
+OTel timer and timing metrics now use Histogram instead of Gauge, preserving
count, sum, and bucket distribution across recordings.
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py
index 1ceddeac318..7d616d0449d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/deadline.py
@@ -33,10 +33,8 @@ class DeadlineResponse(BaseModel):
deadline_time: datetime
missed: bool
created_at: datetime
+ alert_id: UUID | None = Field(validation_alias="deadline_alert_id",
default=None)
alert_name: str | None =
Field(validation_alias=AliasPath("deadline_alert", "name"), default=None)
- alert_description: str | None = Field(
- validation_alias=AliasPath("deadline_alert", "description"),
default=None
- )
class DeadlineCollectionResponse(BaseModel):
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index 63ea1bc04d0..007caac1b1d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -2284,16 +2284,17 @@ components:
type: string
format: date-time
title: Created At
- alert_name:
+ alert_id:
anyOf:
- type: string
+ format: uuid
- type: 'null'
- title: Alert Name
- alert_description:
+ title: Alert Id
+ alert_name:
anyOf:
- type: string
- type: 'null'
- title: Alert Description
+ title: Alert Name
type: object
required:
- id
diff --git a/airflow-core/src/airflow/models/serialized_dag.py
b/airflow-core/src/airflow/models/serialized_dag.py
index 18ad03546ef..be55c3e0908 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -431,7 +431,7 @@ class SerializedDagModel(Base):
existing_deadline_uuids: list[str],
new_deadline_data: list[dict],
session: Session,
- ) -> dict[str, dict] | None:
+ ) -> tuple[dict[str, dict], dict[str, str | None]] | None:
"""
Try to reuse existing deadline UUIDs if the deadline definitions
haven't changed.
@@ -440,7 +440,11 @@ class SerializedDagModel(Base):
:param existing_deadline_uuids: List of UUID strings from existing
serialized Dag
:param new_deadline_data: List of new deadline alert data dicts from
the Dag
:param session: Database session
- :return: UUID mapping dict if all match, None if any mismatch detected
+ :return: Tuple of (uuid_mapping, name_updates) if all definitions
match, None if any
+ mismatch detected. ``uuid_mapping`` maps UUID string → new
deadline data dict.
+ ``name_updates`` maps UUID string → new name **only** for entries
whose name
+ changed relative to the existing DB row, so callers can issue
targeted UPDATEs
+ and reliably detect whether any DB write occurred.
"""
# defensive check for old 3.1.x format
if existing_deadline_uuids and not
isinstance(existing_deadline_uuids[0], str):
@@ -468,6 +472,7 @@ class SerializedDagModel(Base):
matched_uuids: set[UUID] = set()
uuid_mapping: dict[str, dict] = {}
+ name_updates: dict[str, str | None] = {}
for deadline_alert in new_deadline_data:
deadline_data = deadline_alert.get(Encoding.VAR, deadline_alert)
@@ -479,9 +484,13 @@ class SerializedDagModel(Base):
if _definitions_match(deadline_data, existing_alert):
# Found a match, reuse this UUID
- uuid_mapping[str(existing_alert.id)] = deadline_data
+ uuid_str = str(existing_alert.id)
+ uuid_mapping[uuid_str] = deadline_data
matched_uuids.add(existing_alert.id)
found_match = True
+ new_name = deadline_data.get(DeadlineAlertFields.NAME)
+ if new_name != existing_alert.name:
+ name_updates[uuid_str] = new_name
break
if not found_match:
@@ -490,7 +499,7 @@ class SerializedDagModel(Base):
# to another deadline), so partial reuse would risk stale
cross-references.
return None
- return uuid_mapping
+ return uuid_mapping, name_updates
@classmethod
def _create_deadline_alert_records(
@@ -510,6 +519,7 @@ class SerializedDagModel(Base):
for uuid_str, deadline_data in uuid_mapping.items():
alert = DeadlineAlertModel(
id=UUID(uuid_str),
+ name=deadline_data.get(DeadlineAlertFields.NAME),
reference=deadline_data[DeadlineAlertFields.REFERENCE],
interval=deadline_data[DeadlineAlertFields.INTERVAL],
callback_def=deadline_data[DeadlineAlertFields.CALLBACK],
@@ -625,6 +635,7 @@ class SerializedDagModel(Base):
serialized_dag_hash = _prefetched.dag_hash
dag_version = _prefetched.dag_version
+ name_updated = False
if dag.data.get("dag", {}).get("deadline"):
# Try to reuse existing deadline UUIDs if the deadline definitions
haven't changed.
# This preserves the hash and avoids unnecessary
SerializedDagModel recreations.
@@ -637,16 +648,24 @@ class SerializedDagModel(Base):
and existing_serialized_dag.data
and (existing_deadline_uuids :=
existing_serialized_dag.data.get("dag", {}).get("deadline"))
):
- deadline_uuid_mapping = cls._try_reuse_deadline_uuids(
+ reuse_result = cls._try_reuse_deadline_uuids(
existing_deadline_uuids,
dag.data["dag"]["deadline"],
session,
)
- if deadline_uuid_mapping is not None:
+ if reuse_result is not None:
+ deadline_uuid_mapping, name_updates = reuse_result
# All deadlines matched — reuse the UUIDs to preserve hash.
- # Clear the mapping since the alert rows already exist in
the DB;
- # no need to delete and recreate identical records.
+ # Only issue UPDATE statements for rows whose name
actually changed to
+ # avoid unnecessary writes and to make the return value
accurate.
+ for uuid_str, new_name in name_updates.items():
+ session.execute(
+ update(DeadlineAlertModel)
+ .where(DeadlineAlertModel.id == UUID(uuid_str))
+ .values(name=new_name)
+ )
+ name_updated = bool(name_updates)
dag.data["dag"]["deadline"] = existing_deadline_uuids
deadline_uuid_mapping = {}
else:
@@ -665,6 +684,10 @@ class SerializedDagModel(Base):
and dag_version
and dag_version.bundle_name == bundle_name
):
+ if name_updated:
+ # The serialized DAG itself is unchanged, but deadline alert
name(s) were
+ # updated in the DB, so report True so callers know a write
did occur.
+ return True
log.debug("Serialized DAG (%s) is unchanged. Skipping writing to
DB", dag.dag_id)
return False
diff --git a/airflow-core/src/airflow/serialization/decoders.py
b/airflow-core/src/airflow/serialization/decoders.py
index a25068443e4..27db6134b13 100644
--- a/airflow-core/src/airflow/serialization/decoders.py
+++ b/airflow-core/src/airflow/serialization/decoders.py
@@ -166,6 +166,7 @@ def decode_deadline_alert(encoded_data: dict):
reference=reference,
interval=datetime.timedelta(seconds=data[DeadlineAlertFields.INTERVAL]),
callback=deserialize(data[DeadlineAlertFields.CALLBACK]),
+ name=data.get(DeadlineAlertFields.NAME),
)
diff --git a/airflow-core/src/airflow/serialization/definitions/deadline.py
b/airflow-core/src/airflow/serialization/definitions/deadline.py
index 061883c6fcd..58eaa46e6f7 100644
--- a/airflow-core/src/airflow/serialization/definitions/deadline.py
+++ b/airflow-core/src/airflow/serialization/definitions/deadline.py
@@ -49,6 +49,7 @@ class DeadlineAlertFields:
serializing DeadlineAlert instances to and from their dictionary
representation.
"""
+ NAME = "name"
REFERENCE = "reference"
INTERVAL = "interval"
CALLBACK = "callback"
@@ -367,3 +368,4 @@ class SerializedDeadlineAlert:
reference: SerializedReferenceModels.SerializedBaseDeadlineReference
interval: timedelta
callback: Any
+ name: str | None = None
diff --git a/airflow-core/src/airflow/serialization/encoders.py
b/airflow-core/src/airflow/serialization/encoders.py
index 2f30511a1e5..87d7ef1f137 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -211,6 +211,7 @@ def encode_deadline_alert(d: DeadlineAlert |
SerializedDeadlineAlert) -> dict[st
from airflow.sdk.serde import serialize
return {
+ "name": d.name,
"reference": encode_deadline_reference(d.reference),
"interval": d.interval.total_seconds(),
"callback": serialize(d.callback),
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 1ae88b9433e..a5746641938 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -8128,18 +8128,19 @@ export const $DeadlineResponse = {
format: 'date-time',
title: 'Created At'
},
- alert_name: {
+ alert_id: {
anyOf: [
{
- type: 'string'
+ type: 'string',
+ format: 'uuid'
},
{
type: 'null'
}
],
- title: 'Alert Name'
+ title: 'Alert Id'
},
- alert_description: {
+ alert_name: {
anyOf: [
{
type: 'string'
@@ -8148,7 +8149,7 @@ export const $DeadlineResponse = {
type: 'null'
}
],
- title: 'Alert Description'
+ title: 'Alert Name'
}
},
type: 'object',
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index cdd4d73211b..1c43301db52 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -2008,8 +2008,8 @@ export type DeadlineResponse = {
deadline_time: string;
missed: boolean;
created_at: string;
+ alert_id?: string | null;
alert_name?: string | null;
- alert_description?: string | null;
};
/**
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_deadlines.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_deadlines.py
index 4dec594eb64..fb69818fa18 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_deadlines.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_deadlines.py
@@ -52,7 +52,6 @@ RUN_MULTI = "run_multi" # 3 deadlines added out-of-order
(ordering test)
RUN_OTHER = "run_other" # has 1 deadline; used to verify per-run isolation
ALERT_NAME = "SLA Breach Alert"
-ALERT_DESCRIPTION = "Fires when SLA is breached"
_CALLBACK_PATH =
"tests.unit.api_fastapi.core_api.routes.ui.test_deadlines._noop_callback"
@@ -154,7 +153,6 @@ def setup(dag_maker, session):
alert = DeadlineAlert(
serialized_dag_id=serialized_dag.id,
name=ALERT_NAME,
- description=ALERT_DESCRIPTION,
reference=DeadlineReference.DAGRUN_QUEUED_AT.serialize_reference(),
interval=3600.0,
callback_def={"path": _CALLBACK_PATH},
@@ -226,7 +224,7 @@ class TestGetDagRunDeadlines:
assert deadline1["deadline_time"] == "2025-01-01T12:00:00Z"
assert deadline1["missed"] is False
assert deadline1["alert_name"] is None
- assert deadline1["alert_description"] is None
+ assert deadline1["alert_id"] is None
assert "id" in deadline1
assert "created_at" in deadline1
@@ -237,14 +235,16 @@ class TestGetDagRunDeadlines:
assert data["total_entries"] == 1
assert data["deadlines"][0]["missed"] is True
- def test_deadline_with_alert_name_and_description(self, test_client):
+ def test_deadline_with_alert_name(self, test_client, session):
+ alert = session.scalar(select(DeadlineAlert).where(DeadlineAlert.name
== ALERT_NAME))
with assert_queries_count(4):
response =
test_client.get(f"/dags/{DAG_ID}/dagRuns/{RUN_ALERT}/deadlines")
assert response.status_code == 200
data = response.json()
assert data["total_entries"] == 1
- assert data["deadlines"][0]["alert_name"] == ALERT_NAME
- assert data["deadlines"][0]["alert_description"] == ALERT_DESCRIPTION
+ dl = data["deadlines"][0]
+ assert dl["alert_name"] == ALERT_NAME
+ assert dl["alert_id"] == str(alert.id)
def test_deadlines_ordered_by_deadline_time_ascending(self, test_client):
with assert_queries_count(4):
diff --git a/airflow-core/tests/unit/models/test_deadline_alert.py
b/airflow-core/tests/unit/models/test_deadline_alert.py
index 9d69577a6d6..a9b1854f6ab 100644
--- a/airflow-core/tests/unit/models/test_deadline_alert.py
+++ b/airflow-core/tests/unit/models/test_deadline_alert.py
@@ -34,7 +34,6 @@ from unit.models import DEFAULT_DATE
DAG_ID = "test_deadline_alert_dag"
DEADLINE_NAME = "Test Alert"
-DEADLINE_DESCRIPTION = "This is a test alert description"
DEADLINE_INTERVAL = 60
DEADLINE_CALLBACK = {"path": "test.callback"}
SERIALIZED_DAG_ID = "serialized_dag_uuid"
@@ -62,7 +61,6 @@ def deadline_alert_orm(dag_maker, session,
deadline_reference):
alert = DeadlineAlert(
serialized_dag_id=serialized_dag.id,
name=DEADLINE_NAME,
- description=DEADLINE_DESCRIPTION,
reference=deadline_reference,
interval=DEADLINE_INTERVAL,
callback_def=DEADLINE_CALLBACK,
@@ -86,7 +84,6 @@ class TestDeadlineAlert:
assert deadline_alert_orm.id is not None
assert deadline_alert_orm.created_at == DEFAULT_DATE
assert deadline_alert_orm.name == DEADLINE_NAME
- assert deadline_alert_orm.description == DEADLINE_DESCRIPTION
def test_minimal_deadline_alert_creation(self, dag_maker, session,
deadline_reference):
with dag_maker(DAG_ID, session=session):
@@ -109,7 +106,6 @@ class TestDeadlineAlert:
assert deadline_alert.id is not None
assert deadline_alert.created_at == DEFAULT_DATE
assert deadline_alert.name is None
- assert deadline_alert.description is None
def test_deadline_alert_repr(self, deadline_alert_orm, deadline_reference):
repr_str = repr(deadline_alert_orm)
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py
b/airflow-core/tests/unit/models/test_serialized_dag.py
index 2185635590e..54438f8e82f 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -850,3 +850,65 @@ class TestSerializedDagModel:
assert new_serdag_count == 2
assert new_serdag.dag_hash != orig_serdag.dag_hash
assert new_alert.interval == 600.0
+
+ def test_deadline_name_change_updates_db_and_returns_true(self,
testing_dag_bundle, session):
+ """Name-only deadline change: UUID reused, DB row updated, write_dag
returns True."""
+ dag_id = "test_deadline_name_change"
+
+ dag = DAG(
+ dag_id=dag_id,
+ deadline=DeadlineAlert(
+ reference=DeadlineReference.DAGRUN_QUEUED_AT,
+ interval=timedelta(minutes=5),
+ callback=AsyncCallback(empty_callback_for_deadline),
+ name="original name",
+ ),
+ )
+ EmptyOperator(task_id="task1", dag=dag)
+ scheduler_dag = sync_dag_to_db(dag, session=session)
+ scheduler_dag.create_dagrun(
+ run_id="test1",
+ run_after=DEFAULT_DATE,
+ state=DagRunState.QUEUED,
+ logical_date=DEFAULT_DATE,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ triggered_by=DagRunTriggeredByType.TEST,
+ run_type=DagRunType.MANUAL,
+ )
+ session.commit()
+
+ orig_serdag = session.scalar(select(SDM).where(SDM.dag_id ==
dag_id).order_by(SDM.created_at.desc()))
+ orig_hash = orig_serdag.dag_hash
+ orig_alert = session.scalar(select(DAM).where(DAM.serialized_dag_id ==
orig_serdag.id))
+ orig_uuid = orig_alert.id
+
+ # Change only the name — reference, interval, and callback are
identical.
+ dag.deadline = DeadlineAlert(
+ reference=DeadlineReference.DAGRUN_QUEUED_AT,
+ interval=timedelta(minutes=5),
+ callback=AsyncCallback(empty_callback_for_deadline),
+ name="updated name",
+ )
+
+ did_write = SDM.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name="testing", session=session)
+ session.commit()
+
+ # write_dag must report True because a DB write (name UPDATE) did
occur.
+ assert did_write is True
+
+ serdag_count =
session.scalar(select(func.count()).select_from(SDM).where(SDM.dag_id ==
dag_id))
+ latest_serdag = session.scalar(
+ select(SDM).where(SDM.dag_id ==
dag_id).order_by(SDM.created_at.desc())
+ )
+ updated_alert = session.scalar(select(DAM).where(DAM.id == orig_uuid))
+
+ # No new SerializedDagModel row — UUID was reused so the hash is
unchanged.
+ assert serdag_count == 1
+ assert latest_serdag.dag_hash == orig_hash
+
+ # The DeadlineAlert row must still use the same UUID.
+ assert updated_alert is not None
+ assert updated_alert.id == orig_uuid
+
+ # The name must have been updated in the DB.
+ assert updated_alert.name == "updated name"
diff --git a/contributing-docs/12_provider_distributions.rst
b/contributing-docs/12_provider_distributions.rst
index 523ad2a8fca..8e376f61b01 100644
--- a/contributing-docs/12_provider_distributions.rst
+++ b/contributing-docs/12_provider_distributions.rst
@@ -117,6 +117,49 @@ you should do it before you make a PR with such changed
dependency changes
Also, you should rebuild the image ``breeze ci-image build`` or answer ``y``
when you are asked to rebuild the
image for the new dependencies to be used in the Breeze CI environment.
+Non-default provider extras
+---------------------------
+
+Some providers depend on packages that cannot be installed in CI by default —
for example a
+proprietary client library (IBM MQ's ``ibmmq``) or a native library that
requires system packages
+(Google's ``leveldb``/``plyvel`` needs ``libleveldb-dev``). Pulling these into
the default
+``uv sync`` would break CI on every runner that doesn't have the prerequisite
installed.
+
+For these cases, declare the dependency as an extra on the provider and
register it as its own
+group at the root, without adding it to ``dev``:
+
+1. **In the provider's ``pyproject.toml``** — keep the package under
``[project.optional-dependencies]``
+ so users can opt in with ``pip install
apache-airflow-providers-<id>[<extra>]``.
+
+2. **In the root ``pyproject.toml``** — add a dedicated entry under
``[dependency-groups]``,
+ then include it from the ``ci-image`` aggregate group. The ``ci-image``
group is the single
+ source of truth referenced by ``Dockerfile``, ``Dockerfile.ci``,
+ ``scripts/docker/install_airflow_when_building_images.sh``, and the breeze
constraints
+ checker — adding your group there is the only change required for the CI
image to pick it up:
+
+ .. code:: toml
+
+ [dependency-groups]
+ my-non-default-extra = [
+ "some-package>=1.2.3",
+ ]
+
+ ci-image = [
+ {include-group = "dev"},
+ {include-group = "docs"},
+ {include-group = "docs-gen"},
+ {include-group = "leveldb"},
+ {include-group = "my-non-default-extra"},
+ ]
+
+3. **If system libraries are required**, add them to
``scripts/docker/install_os_dependencies.sh``
+ so the CI image has the prerequisites before ``uv sync`` runs.
+
+Because the new group is *not* part of ``dev``, a plain ``uv sync`` on a
contributor's machine
+will not try to install it. The CI image installs it via ``ci-image``;
provider unit tests that
+import the proprietary or hard-to-build module should mock it (see
``providers/google/leveldb``
+for the established pattern).
+
Provider's cross-dependencies
-----------------------------
diff --git a/dev/breeze/src/airflow_breeze/utils/constraints_version_check.py
b/dev/breeze/src/airflow_breeze/utils/constraints_version_check.py
index 75c5ee22af5..ced0564d855 100755
--- a/dev/breeze/src/airflow_breeze/utils/constraints_version_check.py
+++ b/dev/breeze/src/airflow_breeze/utils/constraints_version_check.py
@@ -593,10 +593,10 @@ def explain_package_upgrade(
additional_args = []
if airflow_constraints_mode == "constraints-source-providers":
# In case of source constraints we also need to add all development
dependencies
- # to reflect exactly what is installed in the CI image by default
- additional_args.extend(
- ["--group", "dev", "--group", "docs", "--group", "docs-gen",
"--group", "leveldb"]
- )
+ # to reflect exactly what is installed in the CI image by default. The
``ci-image``
+ # group aggregates dev/docs/docs-gen plus any hard-to-install provider
extras
+ # (see root pyproject.toml).
+ additional_args.extend(["--group", "ci-image"])
with (
preserve_pyproject_file(AIRFLOW_ROOT_PATH / "pyproject.toml") as
airflow_pyproject,
preserve_pyproject_file(AIRFLOW_ROOT_PATH / "uv.lock"),
diff --git a/pyproject.toml b/pyproject.toml
index e9ee8bc533c..755f5cbd231 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1343,6 +1343,19 @@ leveldb = [
"plyvel>=1.5.1"
]
+# Aggregate of every group the CI image installs by default. Single source of
+# truth referenced from the Dockerfiles, the image install script, and the
+# breeze constraints checker. When a provider needs to register a
+# hard-to-install extra (proprietary client, needs system libs, etc.), add a
+# dedicated [dependency-groups] entry above and include it here — no Dockerfile
+# or breeze edits required. See
contributing-docs/12_provider_distributions.rst.
+ci-image = [
+ {include-group = "dev"},
+ {include-group = "docs"},
+ {include-group = "docs-gen"},
+ {include-group = "leveldb"},
+]
+
[tool.uv]
# Bump this only when the project actually relies on a newer uv feature/fix.
It is a
# minimum contributors must install, NOT the uv CI pins to — keeping it in
lockstep
diff --git a/scripts/docker/install_airflow_when_building_images.sh
b/scripts/docker/install_airflow_when_building_images.sh
index 85f879786f3..772465678a8 100644
--- a/scripts/docker/install_airflow_when_building_images.sh
+++ b/scripts/docker/install_airflow_when_building_images.sh
@@ -57,8 +57,8 @@ function install_from_sources() {
# (binary lxml embeds its own libxml2, while xmlsec uses system one).
# See https://bugs.launchpad.net/lxml/+bug/2110068
set -x
- uv sync --all-packages --resolution highest --group dev --group docs
--group docs-gen \
- --group leveldb ${extra_sync_flags} --no-binary-package lxml
--no-binary-package xmlsec \
+ uv sync --all-packages --resolution highest --group ci-image \
+ ${extra_sync_flags} --no-binary-package lxml --no-binary-package
xmlsec \
--no-python-downloads --no-managed-python
else
set +x
@@ -70,8 +70,8 @@ function install_from_sources() {
# libxml2 (binary lxml embeds its own libxml2, while xmlsec uses
system one).
# See https://bugs.launchpad.net/lxml/+bug/2110068
set -x
- if ! uv sync --all-packages --frozen --group dev --group docs --group
docs-gen \
- --group leveldb ${extra_sync_flags} --no-binary-package lxml
--no-binary-package xmlsec \
+ if ! uv sync --all-packages --frozen --group ci-image \
+ ${extra_sync_flags} --no-binary-package lxml --no-binary-package
xmlsec \
--no-python-downloads --no-managed-python; then
set +x
if [[ ${AIRFLOW_FALLBACK_NO_CONSTRAINTS_INSTALLATION} != "true"
]]; then
@@ -86,8 +86,8 @@ function install_from_sources() {
echo "${COLOR_BLUE}Falling back to re-resolving dependencies (uv
sync without --frozen).${COLOR_RESET}"
echo
set -x
- uv sync --all-packages --group dev --group docs --group docs-gen \
- --group leveldb ${extra_sync_flags} --no-binary-package lxml
--no-binary-package xmlsec \
+ uv sync --all-packages --group ci-image \
+ ${extra_sync_flags} --no-binary-package lxml
--no-binary-package xmlsec \
--no-python-downloads --no-managed-python
set +x
fi
diff --git
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
index c8db3ee02f9..b7b8effe8dc 100644
---
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
+++
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
@@ -30,6 +30,7 @@ from opentelemetry.sdk.metrics._internal.export import (
ConsoleMetricExporter,
PeriodicExportingMetricReader,
)
+from opentelemetry.sdk.metrics.view import
ExponentialBucketHistogramAggregation, View
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from ..common import get_otel_data_exporter
@@ -146,7 +147,8 @@ class _OtelTimer(Timer):
"""
An implementation of Stats.Timer() which records the result in the OTel
Metrics Map.
- OpenTelemetry does not have a native timer, we will store the values as a
Gauge.
+ OpenTelemetry does not have a native timer; values are stored as a
Histogram so that
+ all observations (count, sum, bucket distribution) are preserved across
multiple recordings.
:param name: The name of the timer.
:param tags: Tags to append to the timer.
@@ -160,9 +162,9 @@ class _OtelTimer(Timer):
def stop(self, send: bool = True) -> None:
super().stop(send)
- if self.name and send and self.duration:
- self.otel_logger.metrics_map.set_gauge_value(
- full_name(prefix=self.otel_logger.prefix, name=self.name),
self.duration, False, self.tags
+ if self.name and send and self.duration is not None:
+ self.otel_logger.metrics_map.record_histogram_value(
+ full_name(prefix=self.otel_logger.prefix, name=self.name),
self.duration, self.tags
)
@@ -278,11 +280,11 @@ class SafeOtelLogger:
*,
tags: Attributes = None,
) -> None:
- """OTel does not have a native timer, stored as a Gauge whose value is
elapsed ms."""
+ """Record a timing observation as a Histogram to preserve distribution
information."""
if self.metrics_validator.test(stat) and
name_is_otel_safe(self.prefix, stat):
if isinstance(dt, datetime.timedelta):
dt = dt.total_seconds() * 1000.0
- self.metrics_map.set_gauge_value(full_name(prefix=self.prefix,
name=stat), float(dt), False, tags)
+
self.metrics_map.record_histogram_value(full_name(prefix=self.prefix,
name=stat), float(dt), tags)
def timer(
self,
@@ -314,15 +316,29 @@ class InternalGauge:
self.gauge.set(new_value, attributes=self.attributes)
+class InternalHistogram:
+ """Stores a histogram instrument for timer/timing metrics."""
+
+ def __init__(self, meter, name: str):
+ otel_safe_name = _get_otel_safe_name(name)
+ self.histogram = meter.create_histogram(name=otel_safe_name, unit="ms")
+ log.debug("Created %s as type: %s", otel_safe_name,
_type_as_str(self.histogram))
+
+ def record(self, value: float, tags: Attributes) -> None:
+ self.histogram.record(value, attributes=tags)
+
+
class MetricsMap:
"""Stores Otel Instruments."""
def __init__(self, meter):
self.meter = meter
self.map = {}
+ self.histograms: dict[str, InternalHistogram] = {}
def clear(self) -> None:
self.map.clear()
+ self.histograms.clear()
def _create_counter(self, name):
"""Create a new counter or up_down_counter for the provided name."""
@@ -376,6 +392,21 @@ class MetricsMap:
self.map[key].set_value(value, delta)
+ def record_histogram_value(self, name: str, value: float, tags:
Attributes) -> None:
+ """
+ Record a timing observation in a Histogram instrument.
+
+ Unlike a Gauge, a Histogram accumulates all observations so that
count, sum,
+ and bucket distribution are preserved across multiple recordings.
+
+ :param name: The name of the histogram to record.
+ :param value: The timing observation in milliseconds.
+ :param tags: Attributes to attach to the observation.
+ """
+ if name not in self.histograms:
+ self.histograms[name] = InternalHistogram(meter=self.meter,
name=name)
+ self.histograms[name].record(value, tags)
+
def flush_otel_metrics():
provider = metrics.get_meter_provider()
@@ -400,6 +431,15 @@ def get_otel_logger(
stat_name_handler: Callable[[str], str] | None = None,
statsd_influxdb_enabled: bool = False,
) -> SafeOtelLogger:
+ """
+ Build and return a :class:`SafeOtelLogger` backed by a configured
:class:`MeterProvider`.
+
+ Histogram instruments (used for ``timing()`` / ``timer()`` metrics) are
aggregated with
+
:class:`~opentelemetry.sdk.metrics.view.ExponentialBucketHistogramAggregation`
+ so that bucket boundaries adapt automatically to the observed data range.
This avoids
+ the need to hand-tune explicit bucket boundaries for metrics that span
very different
+ scales (milliseconds to hours).
+ """
otel_env_config = load_metrics_env_config()
effective_service_name: str = otel_env_config.service_name or service_name
or "airflow"
@@ -453,6 +493,12 @@ def get_otel_logger(
MeterProvider(
resource=resource,
metric_readers=readers,
+ views=[
+ View(
+ instrument_type=metrics.Histogram,
+ aggregation=ExponentialBucketHistogramAggregation(),
+ )
+ ],
shutdown_on_exit=False,
),
)
diff --git
a/shared/observability/tests/observability/metrics/test_otel_logger.py
b/shared/observability/tests/observability/metrics/test_otel_logger.py
index f7b348354d7..3c1369ec44b 100644
--- a/shared/observability/tests/observability/metrics/test_otel_logger.py
+++ b/shared/observability/tests/observability/metrics/test_otel_logger.py
@@ -25,6 +25,7 @@ from unittest import mock
import pytest
from opentelemetry.metrics import MeterProvider
+from opentelemetry.sdk.metrics.view import
ExponentialBucketHistogramAggregation, View
from airflow_shared.observability.common import get_otel_data_exporter
from airflow_shared.observability.exceptions import InvalidStatsNameException
@@ -244,25 +245,28 @@ class TestOtelMetrics:
self.stats.timing(name, dt=datetime.timedelta(seconds=123))
-
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
- expected_value = 123000.0
- assert self.map[full_name(name)].value == expected_value
+
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
unit="ms")
+
self.meter.get_meter().create_histogram.return_value.record.assert_called_once_with(
+ 123000.0, attributes=None
+ )
def test_timing_new_metric_with_tags(self, name):
tags = {"hello": "world"}
- key = _generate_key_name(full_name(name), tags)
self.stats.timing(name, dt=1, tags=tags)
-
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
- self.map[key].attributes == tags
+
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
unit="ms")
+
self.meter.get_meter().create_histogram.return_value.record.assert_called_once_with(
+ 1.0, attributes=tags
+ )
def test_timing_existing_metric(self, name):
self.stats.timing(name, dt=1)
self.stats.timing(name, dt=2)
-
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
- assert self.map[full_name(name)].value == 2
+ # histogram created only once, but both observations are recorded
+
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
unit="ms")
+ assert
self.meter.get_meter().create_histogram.return_value.record.call_count == 2
# For the four test_timer_foo tests below:
# time.perf_count() is called once to get the starting timestamp and
again
@@ -277,7 +281,7 @@ class TestOtelMetrics:
expected_duration = 3140.0
assert timer.duration == expected_duration
assert mock_time.call_count == 2
-
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
+
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
unit="ms")
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_no_name_returns_float_but_does_not_store_value(self,
mock_time, name):
@@ -288,7 +292,7 @@ class TestOtelMetrics:
expected_duration = 3140.0
assert timer.duration == expected_duration
assert mock_time.call_count == 2
- self.meter.get_meter().create_gauge.assert_not_called()
+ self.meter.get_meter().create_histogram.assert_not_called()
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_start_and_stop_manually_send_false(self, mock_time, name):
@@ -301,7 +305,7 @@ class TestOtelMetrics:
expected_value = 3140.0
assert timer.duration == expected_value
assert mock_time.call_count == 2
- self.meter.get_meter().create_gauge.assert_not_called()
+ self.meter.get_meter().create_histogram.assert_not_called()
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_start_and_stop_manually_send_true(self, mock_time, name):
@@ -314,7 +318,7 @@ class TestOtelMetrics:
expected_value = 3140.0
assert timer.duration == expected_value
assert mock_time.call_count == 2
-
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
+
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
unit="ms")
@pytest.mark.parametrize(
(
@@ -415,6 +419,18 @@ class TestOtelMetrics:
==
f"opentelemetry.exporter.otlp.proto.{expected_exporter_module}.metric_exporter"
)
+ @mock.patch("airflow_shared.observability.metrics.otel_logger.metrics")
+
@mock.patch("airflow_shared.observability.metrics.otel_logger.MeterProvider")
+ def test_get_otel_logger_uses_exponential_histogram_view(self,
mock_provider, mock_metrics):
+ get_otel_logger(host="localhost", port=4318)
+
+ call_kwargs = mock_provider.call_args.kwargs
+ views = call_kwargs["views"]
+ assert len(views) == 1
+ view = views[0]
+ assert isinstance(view, View)
+ assert isinstance(view._aggregation,
ExponentialBucketHistogramAggregation)
+
def test_atexit_flush_on_process_exit(self):
"""
Run a process that initializes a logger, creates a stat and then exits.
@@ -422,8 +438,11 @@ class TestOtelMetrics:
The logger initialization registers an atexit hook.
Test that the hook runs and flushes the created stat at shutdown.
"""
- test_module_name = "tests.observability.metrics.test_otel_logger"
- function_call_str = f"import {test_module_name} as m;
m.mock_service_run()"
+ function_call_str = (
+ "from airflow_shared.observability.metrics.otel_logger import
get_otel_logger; "
+ "logger = get_otel_logger(debug=True); "
+ "logger.incr('my_test_stat')"
+ )
proc = subprocess.run(
[sys.executable, "-c", function_call_str],
diff --git a/task-sdk/src/airflow/sdk/definitions/deadline.py
b/task-sdk/src/airflow/sdk/definitions/deadline.py
index 2fe220e789d..7164eeec076 100644
--- a/task-sdk/src/airflow/sdk/definitions/deadline.py
+++ b/task-sdk/src/airflow/sdk/definitions/deadline.py
@@ -145,9 +145,11 @@ class DeadlineAlert:
reference: DeadlineReferenceType,
interval: timedelta,
callback: Callback,
+ name: str | None = None,
):
self.reference = reference
self.interval = interval
+ self.name = name
if not isinstance(callback, (AsyncCallback, SyncCallback)):
raise ValueError(f"Callbacks of type {type(callback).__name__} are
not currently supported")
diff --git a/task-sdk/src/airflow/sdk/definitions/taskgroup.py
b/task-sdk/src/airflow/sdk/definitions/taskgroup.py
index 97d97148170..25bb3e5dd81 100644
--- a/task-sdk/src/airflow/sdk/definitions/taskgroup.py
+++ b/task-sdk/src/airflow/sdk/definitions/taskgroup.py
@@ -141,6 +141,10 @@ class TaskGroup(DAGNode):
if not dag:
raise RuntimeError("TaskGroup can only be used inside a dag")
+ def __getitem__(self, key: str) -> DAGNode:
+ """Return a child node by label via ``tg[label]`` syntax. See
`get_child_by_label`."""
+ return self.get_child_by_label(key)
+
def __attrs_post_init__(self):
# TODO: If attrs supported init only args we could use that here
# https://github.com/python-attrs/attrs/issues/342
diff --git a/task-sdk/tests/task_sdk/definitions/test_taskgroup.py
b/task-sdk/tests/task_sdk/definitions/test_taskgroup.py
index 73cd272bb9a..2da0b079927 100644
--- a/task-sdk/tests/task_sdk/definitions/test_taskgroup.py
+++ b/task-sdk/tests/task_sdk/definitions/test_taskgroup.py
@@ -223,6 +223,31 @@ def test_build_task_group_with_prefix():
assert group4.get_child_by_label("task4") == task4
[email protected](
+ "prefix",
+ [
+ pytest.param(True, id="prefix_on"),
+ pytest.param(False, id="prefix_off"),
+ ],
+)
+def test_taskgroup_getitem_returns_child_by_label(prefix: bool):
+ """Tests that TaskGroup[label] returns the correct child task or
subgroup."""
+ logical_date = pendulum.datetime(2020, 1, 1)
+ with DAG("test_getitem", start_date=logical_date):
+ with TaskGroup("group1", prefix_group_id=prefix) as group1:
+ task1 = EmptyOperator(task_id="task1")
+ with TaskGroup("subgroup", prefix_group_id=prefix) as subgroup:
+ task2 = EmptyOperator(task_id="task2")
+
+ assert group1["task1"] == task1
+ assert group1["subgroup"] == subgroup
+ assert group1["subgroup"]["task2"] == task2
+
+ # Missing label raises KeyError
+ with pytest.raises(KeyError):
+ group1["nonexistent"]
+
+
def test_build_task_group_with_prefix_functionality():
"""
Tests TaskGroup prefix_group_id functionality - additional test for
comprehensive coverage.