kacpermuda commented on code in PR #66992: URL: https://github.com/apache/airflow/pull/66992#discussion_r3324464714
########## providers/openlineage/src/airflow/providers/openlineage/utils/emission_policy.py: ########## @@ -0,0 +1,1066 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Per-scope emission policy resolution for the OpenLineage provider. + +The ``emission_policy`` Airflow configuration option (``[openlineage]`` section) +accepts a JSON array of rule objects. Each rule has the shape:: + + { + "scope": {<scope keys>}, # required, may be empty for "global" + "match_mode": "exact" | "regex", # optional, default "exact" + "controls": {<control flags>}, # required, must be non-empty + "locked": true | false # optional, default false + } + +Top-level keys outside the four above are rejected with a WARNING and the rule +is skipped. + +Scope keys (all optional inside ``scope``): + +- ``operator`` — fully-qualified operator class name; matches task events for that + operator type. +- ``dag_id`` — matches task and/or dag run events for all tasks / dag runs belonging + to the named DAG. Use ``emit_task_events`` / ``emit_dag_events`` to target one + event type selectively. +- ``task_id`` — only valid alongside ``dag_id``; targets a specific task. +- *(empty ``scope: {}``)* — global default override; applies to every task and DAG event. + +``operator`` cannot be combined with ``dag_id`` or ``task_id`` (scope is one of: +global, operator-only, dag-only, dag+task). ``task_id`` without ``dag_id`` and +``operator`` combined with ``emit_dag_events`` (or ``task_id`` combined with +``emit_dag_events``) are also rejected with a WARNING. + +``match_mode`` lives at the top level: ``"exact"`` (default) or ``"regex"``. +When set to ``"regex"``, every value inside ``scope`` (``dag_id``, ``task_id``, +``operator``) is treated as a ``re.fullmatch`` pattern. + +Control flag keys (all optional inside ``controls``; the dict must be non-empty): + +- ``emit`` — shorthand: disable ALL OpenLineage events in scope (both task and dag + run events). Default: ``true``. +- ``emit_task_events`` — disable task-level events only; takes precedence over + ``emit`` for task event decisions. Default: ``true``. +- ``emit_dag_events`` — disable dag-run-level events only; takes precedence over + ``emit`` for dag event decisions. Default: ``true``. +- ``extract_operator_metadata`` — whether to run operator-specific extractor-based metadata + collection. When ``true``, the extractor manager calls the operator's OpenLineage extractor + (if registered), which may produce dataset inputs/outputs, job facets, run facets, and + other operator-specific metadata. When ``false``, the entire extraction pipeline is skipped + and a minimal event is emitted. Only meaningful for task events. Default: ``true``. +- ``include_source_code`` — whether to include operator source code in the + ``SourceCodeJobFacet`` for Python and Bash operators. Only meaningful for task events + when ``extract_operator_metadata`` is also ``true``. Default: ``true``. +- ``hook_lineage`` — whether to use ``HookLineageCollector`` as a fallback when the + extractor finds no inputs/outputs. **Has no effect when ``extract_operator_metadata`` + is ``false``** — the entire extraction pipeline (including hook lineage) is skipped. + Only meaningful for task events. Default: ``true``. +- ``include_full_task_info`` — whether to include the full serialised operator state + in the ``AirflowRunFacet``. When ``false`` (default), only a curated subset of + task attributes is sent. Only meaningful for task events. Default: ``false``. + +``locked: true`` is an admin floor lock that prevents per-Dag / per-task authoring +overrides from changing the field(s) carried by this rule's ``controls`` dict. + +Flag hierarchy +~~~~~~~~~~~~~~ + +Flags are not fully independent — some only take effect when a higher-level flag is +enabled: + +- ``extract_operator_metadata: false`` skips the **entire** operator extraction pipeline. + When this is ``false``, both ``include_source_code`` and ``hook_lineage`` have **no + effect** regardless of their values, because the code paths they control are never + reached. +- ``include_source_code`` only applies inside Python and Bash operator extractors; setting + it to ``false`` on other operator types is a no-op. + +Priority for **task events** (most specific tier wins; within a tier, last matching +rule wins): + +1. ``dag_id`` + ``task_id`` +2. ``dag_id`` +3. ``operator`` +4. Global (no match keys) +5. Built-in defaults + +For ``emit`` at the task level: ``emit_task_events`` beats ``emit`` within the same +rule. + +Priority for **DAG run events**: + +1. ``dag_id`` rule (using ``emit_dag_events`` or ``emit``) +2. Global (no match keys) +3. Built-in defaults + +For ``emit`` at the dag level: ``emit_dag_events`` beats ``emit`` within the same +rule. + +Legacy config translation +------------------------- + +Any active legacy config option (``disabled_for_operators``, ``disable_source_code``, +``include_full_task_info``, ``selective_enable``) is **always** translated into equivalent +``emission_policy`` rules, regardless of whether ``emission_policy`` itself is set. The +translated rules are prepended before any user-provided rules, so user rules win within +each priority tier (last-wins). A ``DeprecationWarning`` is issued listing every translated +option — silence it by migrating those options into ``emission_policy`` exclusively. + +When no legacy option is active and ``emission_policy`` is empty, the resolver returns +the built-in defaults with no warnings. + +Audit logging +------------- + +Every resolved field that is non-default — whether from a rule or from a legacy +translation — is logged at INFO level, identifying the field, the event context, +and the exact rule that caused the change. +""" + +from __future__ import annotations + +import logging +import re +import warnings +from dataclasses import dataclass, replace +from typing import TYPE_CHECKING + +from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.providers.openlineage import conf as _ol_conf + +if TYPE_CHECKING: + from airflow.providers.openlineage.utils.utils import AnyOperator + +log = logging.getLogger(__name__) + +# Sentinel used during field resolution — distinct from ``False``. +_UNSET = object() + +# Control flag names — the keys that may appear inside a rule's ``controls`` dict. +EMIT = "emit" +EMIT_TASK_EVENTS = "emit_task_events" +EMIT_DAG_EVENTS = "emit_dag_events" +EXTRACT_OPERATOR_METADATA = "extract_operator_metadata" +INCLUDE_SOURCE_CODE = "include_source_code" +HOOK_LINEAGE = "hook_lineage" +INCLUDE_FULL_TASK_INFO = "include_full_task_info" + +# Scope keys — the keys that may appear inside a rule's ``scope`` dict. +SCOPE_DAG_ID = "dag_id" +SCOPE_TASK_ID = "task_id" +SCOPE_OPERATOR = "operator" + +# Top-level rule keys. +RULE_SCOPE = "scope" +RULE_CONTROLS = "controls" +RULE_MATCH_MODE = "match_mode" +RULE_LOCKED = "locked" + +# Param names used by the authoring API to store flow-control flags on operator / DAG +# objects. Kept in this module (rather than the api module) so the resolver can read +# them without a circular import. +OL_EMISSION_POLICY_TASK_PARAM = "_ol_emission_policy" +OL_EMISSION_POLICY_DAG_PARAM = "_ol_emission_policy_dag" + +# Schema enforcement: only these keys may appear at each level. Unknown keys cause +# the rule to be skipped with a WARNING (catches typos like {"scope": {"dgg_id": "x"}}). +_ALLOWED_TOP_LEVEL_KEYS: frozenset[str] = frozenset({RULE_SCOPE, RULE_MATCH_MODE, RULE_CONTROLS, RULE_LOCKED}) +_ALLOWED_SCOPE_KEYS: frozenset[str] = frozenset({SCOPE_DAG_ID, SCOPE_TASK_ID, SCOPE_OPERATOR}) +_ALLOWED_CONTROL_KEYS: frozenset[str] = frozenset( + { + EMIT, + EMIT_TASK_EVENTS, + EMIT_DAG_EVENTS, + EXTRACT_OPERATOR_METADATA, + INCLUDE_SOURCE_CODE, + HOOK_LINEAGE, + INCLUDE_FULL_TASK_INFO, + } +) + +# Fields scanned for ``locked: true`` on task-event resolution. +# ``emit`` is intentionally absent: emit-locking is driven by either ``emit`` or +# ``emit_task_events`` appearing in a locked rule (handled separately in +# :func:`_compute_locked_task_fields`), so listing it here would double-count. +_LOCKABLE_TASK_FIELDS: tuple[str, ...] = ( + EXTRACT_OPERATOR_METADATA, + INCLUDE_SOURCE_CODE, + HOOK_LINEAGE, + INCLUDE_FULL_TASK_INFO, +) + +# Authoring flag classification: which keys are relevant for task-level vs DAG-run +# resolution. Used by the authoring API to split a single call into the two stored +# param dicts. +_TASK_FLAG_KEYS: frozenset[str] = frozenset( + { + EMIT, + EMIT_TASK_EVENTS, + EXTRACT_OPERATOR_METADATA, + INCLUDE_SOURCE_CODE, + HOOK_LINEAGE, + INCLUDE_FULL_TASK_INFO, + } +) +_DAG_FLAG_KEYS: frozenset[str] = frozenset({EMIT, EMIT_DAG_EVENTS}) + +# Audit-log direction tables. Fields default-true → log on transition to false; +# fields default-false → log on transition to true. +_AUDIT_DISABLE_FIELDS: frozenset[str] = frozenset( + {EMIT, EXTRACT_OPERATOR_METADATA, INCLUDE_SOURCE_CODE, HOOK_LINEAGE} +) +_AUDIT_ENABLE_FIELDS: frozenset[str] = frozenset({INCLUDE_FULL_TASK_INFO}) + + +@dataclass(frozen=True) +class EmissionPolicy: + """Resolved emission policy for a specific event context.""" + + emit: bool + extract_operator_metadata: bool + include_source_code: bool + hook_lineage: bool + include_full_task_info: bool + + @classmethod + def defaults(cls) -> EmissionPolicy: + """Return the default policy (all controls at their built-in defaults).""" + return cls( + emit=True, + extract_operator_metadata=True, + include_source_code=True, + hook_lineage=True, + include_full_task_info=False, + ) + + +def _matches(pattern: str, value: str, match_mode: str) -> bool: + """Return ``True`` if *value* matches *pattern* according to *match_mode*.""" + if match_mode == "regex": + return bool(re.fullmatch(pattern, value)) + return pattern == value + + +def _read_param(obj: object, param_name: str) -> dict[str, bool]: + """Read the flags dict stored at *param_name* on *obj*, or ``{}`` if absent.""" + from airflow.providers.common.compat.sdk import Param + + val = getattr(obj, "params", {}).get(param_name) + if val is None: + return {} + if isinstance(val, Param): + val = val.value + return val if isinstance(val, dict) else {} + + +def _merge_param(obj: object, param_name: str, new_flags: dict[str, bool]) -> None: + """ + Merge *new_flags* into the ``param_name`` param on *obj*, creating it if absent. + + The public entry point + (:func:`~airflow.providers.openlineage.api.emission_policy.extend_global_openlineage_emission_policy`) + validates that *obj* is a DAG or task-like object before reaching here, so a missing + ``params`` attribute indicates a programmer error (e.g. an incorrectly-mocked object + in tests) — we raise loud rather than silently dropping flags. + """ + from airflow.providers.common.compat.sdk import Param + + params = getattr(obj, "params", None) + if params is None: + raise TypeError( + f"extend_global_openlineage_emission_policy: {type(obj).__name__} instance has no " + "'params' attribute — cannot store flow-control flags. " + "Pass a DAG, an Operator, or an XComArg." + ) + existing = params.get(param_name) + if existing is not None: + existing_val = existing.value if isinstance(existing, Param) else existing + if isinstance(existing_val, dict): + new_flags = {**existing_val, **new_flags} + params[param_name] = Param(new_flags) + + +def _log_disabled(field: str, context: str, rule: dict | None) -> None: + """Log an INFO-level audit message when a control field is non-default.""" + if rule is not None: + log.info( + "emission_policy: '%s' disabled for %s — matched by rule: %r", + field, + context, + rule, + ) + else: + log.info( + "emission_policy: '%s' suppressed for %s — selective_enable opt-in not detected", + field, + context, + ) + + +def _log_enabled(field: str, context: str, rule: dict | None) -> None: + """Log an INFO-level audit message when a normally-false field is enabled by a rule.""" + if rule is not None: + log.info( + "emission_policy: '%s' enabled for %s — matched by rule: %r", + field, + context, + rule, + ) + + +def _audit_log_updates(updates: dict[str, bool], context: str, source: dict) -> None: + """Audit-log every non-default value introduced by *updates* in one pass.""" + for field, value in updates.items(): + if field in _AUDIT_DISABLE_FIELDS and not value: + _log_disabled(field, context, source) + elif field in _AUDIT_ENABLE_FIELDS and value: + _log_enabled(field, context, source) + + +def _validate_rule(rule: dict) -> bool: + """ + Return ``True`` if the rule matches the nested schema; warn and return ``False`` otherwise. + + A valid rule has exactly these top-level keys: ``scope`` (dict, possibly empty), + ``controls`` (non-empty dict), and the optionals ``match_mode`` / ``locked``. + Unknown keys at any level cause the rule to be skipped — that catches user + typos like ``{"scope": {"dgg_id": "x"}}``. + """ + unknown_top = set(rule) - _ALLOWED_TOP_LEVEL_KEYS + if unknown_top: + log.warning( + "emission_policy rule has unknown top-level key(s) %s (allowed: %s); ignoring: %r", + sorted(unknown_top), + sorted(_ALLOWED_TOP_LEVEL_KEYS), + rule, + ) + return False + + if RULE_SCOPE not in rule: + log.warning( + "emission_policy rule is missing required 'scope' key (use 'scope': {} for global); ignoring: %r", + rule, + ) + return False + scope = rule[RULE_SCOPE] + if not isinstance(scope, dict): + log.warning( + "emission_policy rule 'scope' must be a dict (use {} for global), got %r; ignoring: %r", + type(scope).__name__, + rule, + ) + return False + unknown_scope = set(scope) - _ALLOWED_SCOPE_KEYS + if unknown_scope: + log.warning( + "emission_policy rule 'scope' has unknown key(s) %s (allowed: %s); ignoring: %r", + sorted(unknown_scope), + sorted(_ALLOWED_SCOPE_KEYS), + rule, + ) + return False + has_dag = SCOPE_DAG_ID in scope + has_task = SCOPE_TASK_ID in scope + has_op = SCOPE_OPERATOR in scope + if has_task and not has_dag: + log.warning( + "emission_policy rule scope has 'task_id' without 'dag_id'; ignoring: %r", + rule, + ) + return False + if has_op and (has_dag or has_task): + log.warning( + "emission_policy rule scope combines 'operator' with 'dag_id'/'task_id' " + "(must be exactly one of: global, operator-only, dag-only, dag+task); ignoring: %r", + rule, + ) + return False + + if RULE_CONTROLS not in rule: + log.warning( + "emission_policy rule is missing required 'controls' dict; ignoring: %r", + rule, + ) + return False + controls = rule[RULE_CONTROLS] + if not isinstance(controls, dict): + log.warning( + "emission_policy rule 'controls' must be a dict, got %r; ignoring: %r", + type(controls).__name__, + rule, + ) + return False + if not controls: + log.warning( + "emission_policy rule has empty 'controls' dict " + "(at least one control flag is required); ignoring: %r", + rule, + ) + return False + unknown_controls = set(controls) - _ALLOWED_CONTROL_KEYS + if unknown_controls: + log.warning( + "emission_policy rule 'controls' has unknown key(s) %s (allowed: %s); ignoring: %r", + sorted(unknown_controls), + sorted(_ALLOWED_CONTROL_KEYS), + rule, + ) + return False + for k, v in controls.items(): + if not isinstance(v, bool): + log.warning( + "emission_policy rule 'controls.%s' must be bool, got %r; ignoring: %r", + k, + v, + rule, + ) + return False + + if has_op and EMIT_DAG_EVENTS in controls: + log.warning( + "emission_policy rule has scope 'operator' with controls 'emit_dag_events' which is meaningless " + "(operators are not associated with DAG run events); ignoring: %r", + rule, + ) + return False + if has_task and EMIT_DAG_EVENTS in controls: + log.warning( + "emission_policy rule has scope 'task_id' with controls 'emit_dag_events' which is meaningless " + "(task-specific rules do not target DAG run events); ignoring: %r", + rule, + ) + return False + + match_mode = rule.get(RULE_MATCH_MODE, "exact") + if match_mode not in ("exact", "regex"): + log.warning( + "emission_policy rule has invalid 'match_mode' %r (must be 'exact' or 'regex'); ignoring: %r", + match_mode, + rule, + ) + return False + if match_mode == "regex": + for key in (SCOPE_DAG_ID, SCOPE_TASK_ID, SCOPE_OPERATOR): + if key in scope: + try: + re.compile(scope[key]) + except re.error as exc: + log.warning( + "emission_policy rule scope.'%s' pattern %r is not a valid regex (%s); ignoring: %r", + key, + scope[key], + exc, + rule, + ) + return False + + if RULE_LOCKED in rule and not isinstance(rule[RULE_LOCKED], bool): + log.warning( + "emission_policy rule 'locked' must be bool, got %r; ignoring: %r", + rule[RULE_LOCKED], + rule, + ) + return False + # Under the nested schema every allowed control key is lockable (validation already + # requires non-empty controls with keys from the allowed set), so there is no + # "lock with no lockable field" warning case to raise. + + return True + + +def _classify_rules( + rules: list[dict], + fqcn: str, + dag_id: str, + task_id: str, +) -> tuple[list[dict], list[dict], list[dict], list[dict]]: + """ + Classify a flat list of validated rules into the four priority tiers for task resolution. + + Returns ``(task_rules, dag_rules, operator_rules, global_rules)``. + """ + task_rules: list[dict] = [] + dag_rules: list[dict] = [] + operator_rules: list[dict] = [] + global_rules: list[dict] = [] + + for rule in rules: + if not isinstance(rule, dict): + log.warning("emission_policy entry is not a dict: %r; ignoring.", rule) + continue + if not _validate_rule(rule): Review Comment: Changed to frozen rule. ########## providers/openlineage/src/airflow/providers/openlineage/utils/emission_policy.py: ########## @@ -0,0 +1,1066 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Per-scope emission policy resolution for the OpenLineage provider. + +The ``emission_policy`` Airflow configuration option (``[openlineage]`` section) +accepts a JSON array of rule objects. Each rule has the shape:: + + { + "scope": {<scope keys>}, # required, may be empty for "global" + "match_mode": "exact" | "regex", # optional, default "exact" + "controls": {<control flags>}, # required, must be non-empty + "locked": true | false # optional, default false + } + +Top-level keys outside the four above are rejected with a WARNING and the rule +is skipped. + +Scope keys (all optional inside ``scope``): + +- ``operator`` — fully-qualified operator class name; matches task events for that + operator type. +- ``dag_id`` — matches task and/or dag run events for all tasks / dag runs belonging + to the named DAG. Use ``emit_task_events`` / ``emit_dag_events`` to target one + event type selectively. +- ``task_id`` — only valid alongside ``dag_id``; targets a specific task. +- *(empty ``scope: {}``)* — global default override; applies to every task and DAG event. + +``operator`` cannot be combined with ``dag_id`` or ``task_id`` (scope is one of: +global, operator-only, dag-only, dag+task). ``task_id`` without ``dag_id`` and +``operator`` combined with ``emit_dag_events`` (or ``task_id`` combined with +``emit_dag_events``) are also rejected with a WARNING. + +``match_mode`` lives at the top level: ``"exact"`` (default) or ``"regex"``. +When set to ``"regex"``, every value inside ``scope`` (``dag_id``, ``task_id``, +``operator``) is treated as a ``re.fullmatch`` pattern. + +Control flag keys (all optional inside ``controls``; the dict must be non-empty): + +- ``emit`` — shorthand: disable ALL OpenLineage events in scope (both task and dag + run events). Default: ``true``. +- ``emit_task_events`` — disable task-level events only; takes precedence over + ``emit`` for task event decisions. Default: ``true``. +- ``emit_dag_events`` — disable dag-run-level events only; takes precedence over + ``emit`` for dag event decisions. Default: ``true``. +- ``extract_operator_metadata`` — whether to run operator-specific extractor-based metadata + collection. When ``true``, the extractor manager calls the operator's OpenLineage extractor + (if registered), which may produce dataset inputs/outputs, job facets, run facets, and + other operator-specific metadata. When ``false``, the entire extraction pipeline is skipped + and a minimal event is emitted. Only meaningful for task events. Default: ``true``. +- ``include_source_code`` — whether to include operator source code in the + ``SourceCodeJobFacet`` for Python and Bash operators. Only meaningful for task events + when ``extract_operator_metadata`` is also ``true``. Default: ``true``. +- ``hook_lineage`` — whether to use ``HookLineageCollector`` as a fallback when the + extractor finds no inputs/outputs. **Has no effect when ``extract_operator_metadata`` + is ``false``** — the entire extraction pipeline (including hook lineage) is skipped. + Only meaningful for task events. Default: ``true``. +- ``include_full_task_info`` — whether to include the full serialised operator state + in the ``AirflowRunFacet``. When ``false`` (default), only a curated subset of + task attributes is sent. Only meaningful for task events. Default: ``false``. + +``locked: true`` is an admin floor lock that prevents per-Dag / per-task authoring +overrides from changing the field(s) carried by this rule's ``controls`` dict. + +Flag hierarchy +~~~~~~~~~~~~~~ + +Flags are not fully independent — some only take effect when a higher-level flag is +enabled: + +- ``extract_operator_metadata: false`` skips the **entire** operator extraction pipeline. + When this is ``false``, both ``include_source_code`` and ``hook_lineage`` have **no + effect** regardless of their values, because the code paths they control are never + reached. +- ``include_source_code`` only applies inside Python and Bash operator extractors; setting + it to ``false`` on other operator types is a no-op. + +Priority for **task events** (most specific tier wins; within a tier, last matching +rule wins): + +1. ``dag_id`` + ``task_id`` +2. ``dag_id`` +3. ``operator`` +4. Global (no match keys) +5. Built-in defaults + +For ``emit`` at the task level: ``emit_task_events`` beats ``emit`` within the same +rule. + +Priority for **DAG run events**: + +1. ``dag_id`` rule (using ``emit_dag_events`` or ``emit``) +2. Global (no match keys) +3. Built-in defaults + +For ``emit`` at the dag level: ``emit_dag_events`` beats ``emit`` within the same +rule. + +Legacy config translation +------------------------- + +Any active legacy config option (``disabled_for_operators``, ``disable_source_code``, +``include_full_task_info``, ``selective_enable``) is **always** translated into equivalent +``emission_policy`` rules, regardless of whether ``emission_policy`` itself is set. The +translated rules are prepended before any user-provided rules, so user rules win within +each priority tier (last-wins). A ``DeprecationWarning`` is issued listing every translated +option — silence it by migrating those options into ``emission_policy`` exclusively. + +When no legacy option is active and ``emission_policy`` is empty, the resolver returns +the built-in defaults with no warnings. + +Audit logging +------------- + +Every resolved field that is non-default — whether from a rule or from a legacy +translation — is logged at INFO level, identifying the field, the event context, +and the exact rule that caused the change. +""" + +from __future__ import annotations + +import logging +import re +import warnings +from dataclasses import dataclass, replace +from typing import TYPE_CHECKING + +from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.providers.openlineage import conf as _ol_conf + +if TYPE_CHECKING: + from airflow.providers.openlineage.utils.utils import AnyOperator + +log = logging.getLogger(__name__) + +# Sentinel used during field resolution — distinct from ``False``. +_UNSET = object() + +# Control flag names — the keys that may appear inside a rule's ``controls`` dict. +EMIT = "emit" +EMIT_TASK_EVENTS = "emit_task_events" +EMIT_DAG_EVENTS = "emit_dag_events" +EXTRACT_OPERATOR_METADATA = "extract_operator_metadata" +INCLUDE_SOURCE_CODE = "include_source_code" +HOOK_LINEAGE = "hook_lineage" +INCLUDE_FULL_TASK_INFO = "include_full_task_info" + +# Scope keys — the keys that may appear inside a rule's ``scope`` dict. +SCOPE_DAG_ID = "dag_id" +SCOPE_TASK_ID = "task_id" +SCOPE_OPERATOR = "operator" + +# Top-level rule keys. +RULE_SCOPE = "scope" +RULE_CONTROLS = "controls" +RULE_MATCH_MODE = "match_mode" +RULE_LOCKED = "locked" + +# Param names used by the authoring API to store flow-control flags on operator / DAG +# objects. Kept in this module (rather than the api module) so the resolver can read +# them without a circular import. +OL_EMISSION_POLICY_TASK_PARAM = "_ol_emission_policy" +OL_EMISSION_POLICY_DAG_PARAM = "_ol_emission_policy_dag" + +# Schema enforcement: only these keys may appear at each level. Unknown keys cause +# the rule to be skipped with a WARNING (catches typos like {"scope": {"dgg_id": "x"}}). +_ALLOWED_TOP_LEVEL_KEYS: frozenset[str] = frozenset({RULE_SCOPE, RULE_MATCH_MODE, RULE_CONTROLS, RULE_LOCKED}) +_ALLOWED_SCOPE_KEYS: frozenset[str] = frozenset({SCOPE_DAG_ID, SCOPE_TASK_ID, SCOPE_OPERATOR}) +_ALLOWED_CONTROL_KEYS: frozenset[str] = frozenset( + { + EMIT, + EMIT_TASK_EVENTS, + EMIT_DAG_EVENTS, + EXTRACT_OPERATOR_METADATA, + INCLUDE_SOURCE_CODE, + HOOK_LINEAGE, + INCLUDE_FULL_TASK_INFO, + } +) + +# Fields scanned for ``locked: true`` on task-event resolution. +# ``emit`` is intentionally absent: emit-locking is driven by either ``emit`` or +# ``emit_task_events`` appearing in a locked rule (handled separately in +# :func:`_compute_locked_task_fields`), so listing it here would double-count. +_LOCKABLE_TASK_FIELDS: tuple[str, ...] = ( + EXTRACT_OPERATOR_METADATA, + INCLUDE_SOURCE_CODE, + HOOK_LINEAGE, + INCLUDE_FULL_TASK_INFO, +) + +# Authoring flag classification: which keys are relevant for task-level vs DAG-run +# resolution. Used by the authoring API to split a single call into the two stored +# param dicts. +_TASK_FLAG_KEYS: frozenset[str] = frozenset( + { + EMIT, + EMIT_TASK_EVENTS, + EXTRACT_OPERATOR_METADATA, + INCLUDE_SOURCE_CODE, + HOOK_LINEAGE, + INCLUDE_FULL_TASK_INFO, + } +) +_DAG_FLAG_KEYS: frozenset[str] = frozenset({EMIT, EMIT_DAG_EVENTS}) + +# Audit-log direction tables. Fields default-true → log on transition to false; +# fields default-false → log on transition to true. +_AUDIT_DISABLE_FIELDS: frozenset[str] = frozenset( + {EMIT, EXTRACT_OPERATOR_METADATA, INCLUDE_SOURCE_CODE, HOOK_LINEAGE} +) +_AUDIT_ENABLE_FIELDS: frozenset[str] = frozenset({INCLUDE_FULL_TASK_INFO}) + + +@dataclass(frozen=True) +class EmissionPolicy: + """Resolved emission policy for a specific event context.""" + + emit: bool + extract_operator_metadata: bool + include_source_code: bool + hook_lineage: bool + include_full_task_info: bool + + @classmethod + def defaults(cls) -> EmissionPolicy: + """Return the default policy (all controls at their built-in defaults).""" + return cls( + emit=True, + extract_operator_metadata=True, + include_source_code=True, + hook_lineage=True, + include_full_task_info=False, + ) + + +def _matches(pattern: str, value: str, match_mode: str) -> bool: + """Return ``True`` if *value* matches *pattern* according to *match_mode*.""" + if match_mode == "regex": + return bool(re.fullmatch(pattern, value)) + return pattern == value + + +def _read_param(obj: object, param_name: str) -> dict[str, bool]: + """Read the flags dict stored at *param_name* on *obj*, or ``{}`` if absent.""" + from airflow.providers.common.compat.sdk import Param + + val = getattr(obj, "params", {}).get(param_name) + if val is None: + return {} + if isinstance(val, Param): + val = val.value + return val if isinstance(val, dict) else {} + + +def _merge_param(obj: object, param_name: str, new_flags: dict[str, bool]) -> None: + """ + Merge *new_flags* into the ``param_name`` param on *obj*, creating it if absent. + + The public entry point + (:func:`~airflow.providers.openlineage.api.emission_policy.extend_global_openlineage_emission_policy`) + validates that *obj* is a DAG or task-like object before reaching here, so a missing + ``params`` attribute indicates a programmer error (e.g. an incorrectly-mocked object + in tests) — we raise loud rather than silently dropping flags. + """ + from airflow.providers.common.compat.sdk import Param + + params = getattr(obj, "params", None) + if params is None: + raise TypeError( + f"extend_global_openlineage_emission_policy: {type(obj).__name__} instance has no " + "'params' attribute — cannot store flow-control flags. " + "Pass a DAG, an Operator, or an XComArg." + ) + existing = params.get(param_name) + if existing is not None: + existing_val = existing.value if isinstance(existing, Param) else existing + if isinstance(existing_val, dict): + new_flags = {**existing_val, **new_flags} + params[param_name] = Param(new_flags) + + +def _log_disabled(field: str, context: str, rule: dict | None) -> None: Review Comment: Removed stale code from previous implementation idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
