mobuchowski commented on code in PR #66992:
URL: https://github.com/apache/airflow/pull/66992#discussion_r3322897988


##########
providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py:
##########
@@ -143,7 +158,7 @@ def extract_metadata(
                     task_info,
                 )
                 self.log.debug("OpenLineage extraction failure details:", 
exc_info=True)
-        else:
+        elif controls.hook_lineage:

Review Comment:
   
   The extract_metadata refactor changed the no-extractor branch so the 
unknownSourceAttribute run facet + extract_inlets_and_outlets fallback only 
runs when hook_lineage is disabled. With default controls (hook_lineage=True), 
an operator with no registered extractor and no hook-collected lineage  
get_hook_lineage() returns None) falls through to a bare return 
OperatorLineage() — losing both the unknownSourceAttribute facet and any 
manually-declared inlets/outlets. This is a broad, silent lineage-content 
regression on the default configuration.
   No test covers this path — test_manager.py was not modified by the PR, so CI 
won't catch it.
   



##########
providers/openlineage/src/airflow/providers/openlineage/conf.py:
##########
@@ -179,3 +179,12 @@ def include_full_task_info() -> bool:
 def debug_mode() -> bool:
     """[openlineage] debug_mode."""
     return conf.getboolean(_CONFIG_SECTION, "debug_mode", fallback="False")
+
+
+@cache
+def emission_policy() -> list[dict]:
+    """[openlineage] emission_policy."""
+    option = conf.getjson(_CONFIG_SECTION, "emission_policy", fallback=[])
+    if not isinstance(option, list):
+        raise ValueError(f"[openlineage] emission_policy must be a JSON array, 
got: {type(option).__name__}")
+    return option

Review Comment:
   resolve_task_emission_policy(...) is called at the top of each 
_on_task_instance_* hook — before the @print_warning-decorated inner function, 
and outside the try block in the manual-state-change path. It calls 
conf.emission_policy(), which now does conf.getjson(...) and
     raises ValueError for a config that is valid JSON but not a list. Unlike 
the three dag-run hooks (which wrap their bodies in try/except BaseException), 
the four task-instance hooks have no such guard. So a single config typo (e.g. 
emission_policy = {...} instead of [{...}]) now
     lets a ValueError escape into Airflow's task-notification path on every 
task event, instead of degrading gracefully as on main. The asymmetry with the 
dag-run hooks is almost certainly unintended.
     Location: 
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:185,
 342, ~497, ~636, 764; root cause conf.py:188-190



##########
providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py:
##########
@@ -184,12 +204,12 @@ def method_exists(method_name):
             return self.default_extractor
         return None
 
-    def _get_extractor(self, task: BaseOperator) -> BaseExtractor | None:
-        extractor = self.get_extractor_class(task)
-        self.log.debug("extractor for %s is %s", task.task_type, extractor)
-        if extractor:
-            return extractor(task)
-        return None
+    def _get_extractor(self, task: BaseOperator, source_code_enabled: bool = 
True) -> BaseExtractor | None:
+        extractor_cls = self.get_extractor_class(task)
+        self.log.debug("extractor for %s is %s", task.task_type, extractor_cls)
+        if extractor_cls is None:
+            return None
+        return extractor_cls(task, source_code_enabled=source_code_enabled)

Review Comment:
   source_code_enabled is passed as a new keyword to every extractor 
constructor in _get_extractor(). Custom extractors following the 
previously-public shape __init__(self, operator) will raise TypeError at 
construction — before extraction begins. The result is worse than
     the flag's intent: instead of merely omitting source code, the entire task 
event is dropped for deployments using such custom extractors. Recommendation: 
set the flag after construction (e.g. extractor.source_code_enabled = ...), or 
fall back when the kwarg isn't accepted.



##########
providers/openlineage/src/airflow/providers/openlineage/utils/emission_policy.py:
##########
@@ -0,0 +1,1066 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Per-scope emission policy resolution for the OpenLineage provider.
+
+The ``emission_policy`` Airflow configuration option (``[openlineage]`` 
section)
+accepts a JSON array of rule objects.  Each rule has the shape::
+
+    {
+        "scope":      {<scope keys>},        # required, may be empty for 
"global"
+        "match_mode": "exact" | "regex",     # optional, default "exact"
+        "controls":   {<control flags>},     # required, must be non-empty
+        "locked":     true | false           # optional, default false
+    }
+
+Top-level keys outside the four above are rejected with a WARNING and the rule
+is skipped.
+
+Scope keys (all optional inside ``scope``):
+
+- ``operator`` — fully-qualified operator class name; matches task events for 
that
+  operator type.
+- ``dag_id`` — matches task and/or dag run events for all tasks / dag runs 
belonging
+  to the named DAG.  Use ``emit_task_events`` / ``emit_dag_events`` to target 
one
+  event type selectively.
+- ``task_id`` — only valid alongside ``dag_id``; targets a specific task.
+- *(empty ``scope: {}``)* — global default override; applies to every task and 
DAG event.
+
+``operator`` cannot be combined with ``dag_id`` or ``task_id`` (scope is one 
of:
+global, operator-only, dag-only, dag+task).  ``task_id`` without ``dag_id`` and
+``operator`` combined with ``emit_dag_events`` (or ``task_id`` combined with
+``emit_dag_events``) are also rejected with a WARNING.
+
+``match_mode`` lives at the top level: ``"exact"`` (default) or ``"regex"``.
+When set to ``"regex"``, every value inside ``scope`` (``dag_id``, ``task_id``,
+``operator``) is treated as a ``re.fullmatch`` pattern.
+
+Control flag keys (all optional inside ``controls``; the dict must be 
non-empty):
+
+- ``emit`` — shorthand: disable ALL OpenLineage events in scope (both task and 
dag
+  run events).  Default: ``true``.
+- ``emit_task_events`` — disable task-level events only; takes precedence over
+  ``emit`` for task event decisions.  Default: ``true``.
+- ``emit_dag_events`` — disable dag-run-level events only; takes precedence 
over
+  ``emit`` for dag event decisions.  Default: ``true``.
+- ``extract_operator_metadata`` — whether to run operator-specific 
extractor-based metadata
+  collection.  When ``true``, the extractor manager calls the operator's 
OpenLineage extractor
+  (if registered), which may produce dataset inputs/outputs, job facets, run 
facets, and
+  other operator-specific metadata.  When ``false``, the entire extraction 
pipeline is skipped
+  and a minimal event is emitted.  Only meaningful for task events.  Default: 
``true``.
+- ``include_source_code`` — whether to include operator source code in the
+  ``SourceCodeJobFacet`` for Python and Bash operators.  Only meaningful for 
task events
+  when ``extract_operator_metadata`` is also ``true``.  Default: ``true``.
+- ``hook_lineage`` — whether to use ``HookLineageCollector`` as a fallback 
when the
+  extractor finds no inputs/outputs.  **Has no effect when 
``extract_operator_metadata``
+  is ``false``** — the entire extraction pipeline (including hook lineage) is 
skipped.
+  Only meaningful for task events.  Default: ``true``.
+- ``include_full_task_info`` — whether to include the full serialised operator 
state
+  in the ``AirflowRunFacet``.  When ``false`` (default), only a curated subset 
of
+  task attributes is sent.  Only meaningful for task events.  Default: 
``false``.
+
+``locked: true`` is an admin floor lock that prevents per-Dag / per-task 
authoring
+overrides from changing the field(s) carried by this rule's ``controls`` dict.
+
+Flag hierarchy
+~~~~~~~~~~~~~~
+
+Flags are not fully independent — some only take effect when a higher-level 
flag is
+enabled:
+
+- ``extract_operator_metadata: false`` skips the **entire** operator 
extraction pipeline.
+  When this is ``false``, both ``include_source_code`` and ``hook_lineage`` 
have **no
+  effect** regardless of their values, because the code paths they control are 
never
+  reached.
+- ``include_source_code`` only applies inside Python and Bash operator 
extractors; setting
+  it to ``false`` on other operator types is a no-op.
+
+Priority for **task events** (most specific tier wins; within a tier, last 
matching
+rule wins):
+
+1. ``dag_id`` + ``task_id``
+2. ``dag_id``
+3. ``operator``
+4. Global (no match keys)
+5. Built-in defaults
+
+For ``emit`` at the task level: ``emit_task_events`` beats ``emit`` within the 
same
+rule.
+
+Priority for **DAG run events**:
+
+1. ``dag_id`` rule (using ``emit_dag_events`` or ``emit``)
+2. Global (no match keys)
+3. Built-in defaults
+
+For ``emit`` at the dag level: ``emit_dag_events`` beats ``emit`` within the 
same
+rule.
+
+Legacy config translation
+-------------------------
+
+Any active legacy config option (``disabled_for_operators``, 
``disable_source_code``,
+``include_full_task_info``, ``selective_enable``) is **always** translated 
into equivalent
+``emission_policy`` rules, regardless of whether ``emission_policy`` itself is 
set. The
+translated rules are prepended before any user-provided rules, so user rules 
win within
+each priority tier (last-wins). A ``DeprecationWarning`` is issued listing 
every translated
+option — silence it by migrating those options into ``emission_policy`` 
exclusively.
+
+When no legacy option is active and ``emission_policy`` is empty, the resolver 
returns
+the built-in defaults with no warnings.
+
+Audit logging
+-------------
+
+Every resolved field that is non-default — whether from a rule or from a legacy
+translation — is logged at INFO level, identifying the field, the event 
context,
+and the exact rule that caused the change.
+"""
+
+from __future__ import annotations
+
+import logging
+import re
+import warnings
+from dataclasses import dataclass, replace
+from typing import TYPE_CHECKING
+
+from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.providers.openlineage import conf as _ol_conf
+
+if TYPE_CHECKING:
+    from airflow.providers.openlineage.utils.utils import AnyOperator
+
+log = logging.getLogger(__name__)
+
+# Sentinel used during field resolution — distinct from ``False``.
+_UNSET = object()
+
+# Control flag names — the keys that may appear inside a rule's ``controls`` 
dict.
+EMIT = "emit"
+EMIT_TASK_EVENTS = "emit_task_events"
+EMIT_DAG_EVENTS = "emit_dag_events"
+EXTRACT_OPERATOR_METADATA = "extract_operator_metadata"
+INCLUDE_SOURCE_CODE = "include_source_code"
+HOOK_LINEAGE = "hook_lineage"
+INCLUDE_FULL_TASK_INFO = "include_full_task_info"
+
+# Scope keys — the keys that may appear inside a rule's ``scope`` dict.
+SCOPE_DAG_ID = "dag_id"
+SCOPE_TASK_ID = "task_id"
+SCOPE_OPERATOR = "operator"
+
+# Top-level rule keys.
+RULE_SCOPE = "scope"
+RULE_CONTROLS = "controls"
+RULE_MATCH_MODE = "match_mode"
+RULE_LOCKED = "locked"
+
+# Param names used by the authoring API to store flow-control flags on 
operator / DAG
+# objects.  Kept in this module (rather than the api module) so the resolver 
can read
+# them without a circular import.
+OL_EMISSION_POLICY_TASK_PARAM = "_ol_emission_policy"
+OL_EMISSION_POLICY_DAG_PARAM = "_ol_emission_policy_dag"
+
+# Schema enforcement: only these keys may appear at each level.  Unknown keys 
cause
+# the rule to be skipped with a WARNING (catches typos like {"scope": 
{"dgg_id": "x"}}).
+_ALLOWED_TOP_LEVEL_KEYS: frozenset[str] = frozenset({RULE_SCOPE, 
RULE_MATCH_MODE, RULE_CONTROLS, RULE_LOCKED})
+_ALLOWED_SCOPE_KEYS: frozenset[str] = frozenset({SCOPE_DAG_ID, SCOPE_TASK_ID, 
SCOPE_OPERATOR})
+_ALLOWED_CONTROL_KEYS: frozenset[str] = frozenset(
+    {
+        EMIT,
+        EMIT_TASK_EVENTS,
+        EMIT_DAG_EVENTS,
+        EXTRACT_OPERATOR_METADATA,
+        INCLUDE_SOURCE_CODE,
+        HOOK_LINEAGE,
+        INCLUDE_FULL_TASK_INFO,
+    }
+)
+
+# Fields scanned for ``locked: true`` on task-event resolution.
+# ``emit`` is intentionally absent: emit-locking is driven by either ``emit`` 
or
+# ``emit_task_events`` appearing in a locked rule (handled separately in
+# :func:`_compute_locked_task_fields`), so listing it here would double-count.
+_LOCKABLE_TASK_FIELDS: tuple[str, ...] = (
+    EXTRACT_OPERATOR_METADATA,
+    INCLUDE_SOURCE_CODE,
+    HOOK_LINEAGE,
+    INCLUDE_FULL_TASK_INFO,
+)
+
+# Authoring flag classification: which keys are relevant for task-level vs 
DAG-run
+# resolution.  Used by the authoring API to split a single call into the two 
stored
+# param dicts.
+_TASK_FLAG_KEYS: frozenset[str] = frozenset(
+    {
+        EMIT,
+        EMIT_TASK_EVENTS,
+        EXTRACT_OPERATOR_METADATA,
+        INCLUDE_SOURCE_CODE,
+        HOOK_LINEAGE,
+        INCLUDE_FULL_TASK_INFO,
+    }
+)
+_DAG_FLAG_KEYS: frozenset[str] = frozenset({EMIT, EMIT_DAG_EVENTS})
+
+# Audit-log direction tables.  Fields default-true → log on transition to 
false;
+# fields default-false → log on transition to true.
+_AUDIT_DISABLE_FIELDS: frozenset[str] = frozenset(
+    {EMIT, EXTRACT_OPERATOR_METADATA, INCLUDE_SOURCE_CODE, HOOK_LINEAGE}
+)
+_AUDIT_ENABLE_FIELDS: frozenset[str] = frozenset({INCLUDE_FULL_TASK_INFO})
+
+
+@dataclass(frozen=True)
+class EmissionPolicy:
+    """Resolved emission policy for a specific event context."""
+
+    emit: bool
+    extract_operator_metadata: bool
+    include_source_code: bool
+    hook_lineage: bool
+    include_full_task_info: bool
+
+    @classmethod
+    def defaults(cls) -> EmissionPolicy:
+        """Return the default policy (all controls at their built-in 
defaults)."""
+        return cls(
+            emit=True,
+            extract_operator_metadata=True,
+            include_source_code=True,
+            hook_lineage=True,
+            include_full_task_info=False,
+        )
+
+
+def _matches(pattern: str, value: str, match_mode: str) -> bool:
+    """Return ``True`` if *value* matches *pattern* according to 
*match_mode*."""
+    if match_mode == "regex":
+        return bool(re.fullmatch(pattern, value))
+    return pattern == value
+
+
+def _read_param(obj: object, param_name: str) -> dict[str, bool]:
+    """Read the flags dict stored at *param_name* on *obj*, or ``{}`` if 
absent."""
+    from airflow.providers.common.compat.sdk import Param
+
+    val = getattr(obj, "params", {}).get(param_name)
+    if val is None:
+        return {}
+    if isinstance(val, Param):
+        val = val.value
+    return val if isinstance(val, dict) else {}
+
+
+def _merge_param(obj: object, param_name: str, new_flags: dict[str, bool]) -> 
None:
+    """
+    Merge *new_flags* into the ``param_name`` param on *obj*, creating it if 
absent.
+
+    The public entry point
+    
(:func:`~airflow.providers.openlineage.api.emission_policy.extend_global_openlineage_emission_policy`)
+    validates that *obj* is a DAG or task-like object before reaching here, so 
a missing
+    ``params`` attribute indicates a programmer error (e.g. an 
incorrectly-mocked object
+    in tests) — we raise loud rather than silently dropping flags.
+    """
+    from airflow.providers.common.compat.sdk import Param
+
+    params = getattr(obj, "params", None)
+    if params is None:
+        raise TypeError(
+            f"extend_global_openlineage_emission_policy: {type(obj).__name__} 
instance has no "
+            "'params' attribute — cannot store flow-control flags. "
+            "Pass a DAG, an Operator, or an XComArg."
+        )
+    existing = params.get(param_name)
+    if existing is not None:
+        existing_val = existing.value if isinstance(existing, Param) else 
existing
+        if isinstance(existing_val, dict):
+            new_flags = {**existing_val, **new_flags}
+    params[param_name] = Param(new_flags)
+
+
+def _log_disabled(field: str, context: str, rule: dict | None) -> None:
+    """Log an INFO-level audit message when a control field is non-default."""
+    if rule is not None:
+        log.info(
+            "emission_policy: '%s' disabled for %s — matched by rule: %r",
+            field,
+            context,
+            rule,
+        )
+    else:
+        log.info(
+            "emission_policy: '%s' suppressed for %s — selective_enable opt-in 
not detected",
+            field,
+            context,
+        )
+
+
+def _log_enabled(field: str, context: str, rule: dict | None) -> None:
+    """Log an INFO-level audit message when a normally-false field is enabled 
by a rule."""
+    if rule is not None:
+        log.info(
+            "emission_policy: '%s' enabled for %s — matched by rule: %r",
+            field,
+            context,
+            rule,
+        )
+
+
+def _audit_log_updates(updates: dict[str, bool], context: str, source: dict) 
-> None:
+    """Audit-log every non-default value introduced by *updates* in one 
pass."""
+    for field, value in updates.items():
+        if field in _AUDIT_DISABLE_FIELDS and not value:
+            _log_disabled(field, context, source)
+        elif field in _AUDIT_ENABLE_FIELDS and value:
+            _log_enabled(field, context, source)
+
+
+def _validate_rule(rule: dict) -> bool:
+    """
+    Return ``True`` if the rule matches the nested schema; warn and return 
``False`` otherwise.
+
+    A valid rule has exactly these top-level keys: ``scope`` (dict, possibly 
empty),
+    ``controls`` (non-empty dict), and the optionals ``match_mode`` / 
``locked``.
+    Unknown keys at any level cause the rule to be skipped — that catches user
+    typos like ``{"scope": {"dgg_id": "x"}}``.
+    """
+    unknown_top = set(rule) - _ALLOWED_TOP_LEVEL_KEYS
+    if unknown_top:
+        log.warning(
+            "emission_policy rule has unknown top-level key(s) %s (allowed: 
%s); ignoring: %r",
+            sorted(unknown_top),
+            sorted(_ALLOWED_TOP_LEVEL_KEYS),
+            rule,
+        )
+        return False
+
+    if RULE_SCOPE not in rule:
+        log.warning(
+            "emission_policy rule is missing required 'scope' key (use 
'scope': {} for global); ignoring: %r",
+            rule,
+        )
+        return False
+    scope = rule[RULE_SCOPE]
+    if not isinstance(scope, dict):
+        log.warning(
+            "emission_policy rule 'scope' must be a dict (use {} for global), 
got %r; ignoring: %r",
+            type(scope).__name__,
+            rule,
+        )
+        return False
+    unknown_scope = set(scope) - _ALLOWED_SCOPE_KEYS
+    if unknown_scope:
+        log.warning(
+            "emission_policy rule 'scope' has unknown key(s) %s (allowed: %s); 
ignoring: %r",
+            sorted(unknown_scope),
+            sorted(_ALLOWED_SCOPE_KEYS),
+            rule,
+        )
+        return False
+    has_dag = SCOPE_DAG_ID in scope
+    has_task = SCOPE_TASK_ID in scope
+    has_op = SCOPE_OPERATOR in scope
+    if has_task and not has_dag:
+        log.warning(
+            "emission_policy rule scope has 'task_id' without 'dag_id'; 
ignoring: %r",
+            rule,
+        )
+        return False
+    if has_op and (has_dag or has_task):
+        log.warning(
+            "emission_policy rule scope combines 'operator' with 
'dag_id'/'task_id' "
+            "(must be exactly one of: global, operator-only, dag-only, 
dag+task); ignoring: %r",
+            rule,
+        )
+        return False
+
+    if RULE_CONTROLS not in rule:
+        log.warning(
+            "emission_policy rule is missing required 'controls' dict; 
ignoring: %r",
+            rule,
+        )
+        return False
+    controls = rule[RULE_CONTROLS]
+    if not isinstance(controls, dict):
+        log.warning(
+            "emission_policy rule 'controls' must be a dict, got %r; ignoring: 
%r",
+            type(controls).__name__,
+            rule,
+        )
+        return False
+    if not controls:
+        log.warning(
+            "emission_policy rule has empty 'controls' dict "
+            "(at least one control flag is required); ignoring: %r",
+            rule,
+        )
+        return False
+    unknown_controls = set(controls) - _ALLOWED_CONTROL_KEYS
+    if unknown_controls:
+        log.warning(
+            "emission_policy rule 'controls' has unknown key(s) %s (allowed: 
%s); ignoring: %r",
+            sorted(unknown_controls),
+            sorted(_ALLOWED_CONTROL_KEYS),
+            rule,
+        )
+        return False
+    for k, v in controls.items():
+        if not isinstance(v, bool):
+            log.warning(
+                "emission_policy rule 'controls.%s' must be bool, got %r; 
ignoring: %r",
+                k,
+                v,
+                rule,
+            )
+            return False
+
+    if has_op and EMIT_DAG_EVENTS in controls:
+        log.warning(
+            "emission_policy rule has scope 'operator' with controls 
'emit_dag_events' which is meaningless "
+            "(operators are not associated with DAG run events); ignoring: %r",
+            rule,
+        )
+        return False
+    if has_task and EMIT_DAG_EVENTS in controls:
+        log.warning(
+            "emission_policy rule has scope 'task_id' with controls 
'emit_dag_events' which is meaningless "
+            "(task-specific rules do not target DAG run events); ignoring: %r",
+            rule,
+        )
+        return False
+
+    match_mode = rule.get(RULE_MATCH_MODE, "exact")
+    if match_mode not in ("exact", "regex"):
+        log.warning(
+            "emission_policy rule has invalid 'match_mode' %r (must be 'exact' 
or 'regex'); ignoring: %r",
+            match_mode,
+            rule,
+        )
+        return False
+    if match_mode == "regex":
+        for key in (SCOPE_DAG_ID, SCOPE_TASK_ID, SCOPE_OPERATOR):
+            if key in scope:
+                try:
+                    re.compile(scope[key])
+                except re.error as exc:
+                    log.warning(
+                        "emission_policy rule scope.'%s' pattern %r is not a 
valid regex (%s); ignoring: %r",
+                        key,
+                        scope[key],
+                        exc,
+                        rule,
+                    )
+                    return False
+
+    if RULE_LOCKED in rule and not isinstance(rule[RULE_LOCKED], bool):
+        log.warning(
+            "emission_policy rule 'locked' must be bool, got %r; ignoring: %r",
+            rule[RULE_LOCKED],
+            rule,
+        )
+        return False
+    # Under the nested schema every allowed control key is lockable 
(validation already
+    # requires non-empty controls with keys from the allowed set), so there is 
no
+    # "lock with no lockable field" warning case to raise.
+
+    return True
+
+
+def _classify_rules(
+    rules: list[dict],
+    fqcn: str,
+    dag_id: str,
+    task_id: str,
+) -> tuple[list[dict], list[dict], list[dict], list[dict]]:
+    """
+    Classify a flat list of validated rules into the four priority tiers for 
task resolution.
+
+    Returns ``(task_rules, dag_rules, operator_rules, global_rules)``.
+    """
+    task_rules: list[dict] = []
+    dag_rules: list[dict] = []
+    operator_rules: list[dict] = []
+    global_rules: list[dict] = []
+
+    for rule in rules:
+        if not isinstance(rule, dict):
+            log.warning("emission_policy entry is not a dict: %r; ignoring.", 
rule)
+            continue
+        if not _validate_rule(rule):

Review Comment:
   A "rule" is a raw dict everywhere. The rich structural invariants live 
entirely in _validate_rule, but nothing carries the "validated" guarantee 
downstream — so _classify_rules (line 506) and _classify_dag_rules (line 551) 
re-run validation on every task/dag resolution
     (listener hot path). This also forces two # type: ignore[return-value] 
casts (lines 603, 643) and the cached-list footgun in #9. Introduce a frozen 
Rule dataclass; make _validate_rule the parser; validate once at config-read 
time. Highest-value type improvement.
     Location: utils/emission_policy.py (rule handling throughout; 
double-validation at 506/551; casts at 603/643)



##########
providers/openlineage/src/airflow/providers/openlineage/utils/emission_policy.py:
##########
@@ -0,0 +1,1066 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Per-scope emission policy resolution for the OpenLineage provider.
+
+The ``emission_policy`` Airflow configuration option (``[openlineage]`` 
section)
+accepts a JSON array of rule objects.  Each rule has the shape::
+
+    {
+        "scope":      {<scope keys>},        # required, may be empty for 
"global"
+        "match_mode": "exact" | "regex",     # optional, default "exact"
+        "controls":   {<control flags>},     # required, must be non-empty
+        "locked":     true | false           # optional, default false
+    }
+
+Top-level keys outside the four above are rejected with a WARNING and the rule
+is skipped.
+
+Scope keys (all optional inside ``scope``):
+
+- ``operator`` — fully-qualified operator class name; matches task events for 
that
+  operator type.
+- ``dag_id`` — matches task and/or dag run events for all tasks / dag runs 
belonging
+  to the named DAG.  Use ``emit_task_events`` / ``emit_dag_events`` to target 
one
+  event type selectively.
+- ``task_id`` — only valid alongside ``dag_id``; targets a specific task.
+- *(empty ``scope: {}``)* — global default override; applies to every task and 
DAG event.
+
+``operator`` cannot be combined with ``dag_id`` or ``task_id`` (scope is one 
of:
+global, operator-only, dag-only, dag+task).  ``task_id`` without ``dag_id`` and
+``operator`` combined with ``emit_dag_events`` (or ``task_id`` combined with
+``emit_dag_events``) are also rejected with a WARNING.
+
+``match_mode`` lives at the top level: ``"exact"`` (default) or ``"regex"``.
+When set to ``"regex"``, every value inside ``scope`` (``dag_id``, ``task_id``,
+``operator``) is treated as a ``re.fullmatch`` pattern.
+
+Control flag keys (all optional inside ``controls``; the dict must be 
non-empty):
+
+- ``emit`` — shorthand: disable ALL OpenLineage events in scope (both task and 
dag
+  run events).  Default: ``true``.
+- ``emit_task_events`` — disable task-level events only; takes precedence over
+  ``emit`` for task event decisions.  Default: ``true``.
+- ``emit_dag_events`` — disable dag-run-level events only; takes precedence 
over
+  ``emit`` for dag event decisions.  Default: ``true``.
+- ``extract_operator_metadata`` — whether to run operator-specific 
extractor-based metadata
+  collection.  When ``true``, the extractor manager calls the operator's 
OpenLineage extractor
+  (if registered), which may produce dataset inputs/outputs, job facets, run 
facets, and
+  other operator-specific metadata.  When ``false``, the entire extraction 
pipeline is skipped
+  and a minimal event is emitted.  Only meaningful for task events.  Default: 
``true``.
+- ``include_source_code`` — whether to include operator source code in the
+  ``SourceCodeJobFacet`` for Python and Bash operators.  Only meaningful for 
task events
+  when ``extract_operator_metadata`` is also ``true``.  Default: ``true``.
+- ``hook_lineage`` — whether to use ``HookLineageCollector`` as a fallback 
when the
+  extractor finds no inputs/outputs.  **Has no effect when 
``extract_operator_metadata``
+  is ``false``** — the entire extraction pipeline (including hook lineage) is 
skipped.
+  Only meaningful for task events.  Default: ``true``.
+- ``include_full_task_info`` — whether to include the full serialised operator 
state
+  in the ``AirflowRunFacet``.  When ``false`` (default), only a curated subset 
of
+  task attributes is sent.  Only meaningful for task events.  Default: 
``false``.
+
+``locked: true`` is an admin floor lock that prevents per-Dag / per-task 
authoring
+overrides from changing the field(s) carried by this rule's ``controls`` dict.
+
+Flag hierarchy
+~~~~~~~~~~~~~~
+
+Flags are not fully independent — some only take effect when a higher-level 
flag is
+enabled:
+
+- ``extract_operator_metadata: false`` skips the **entire** operator 
extraction pipeline.
+  When this is ``false``, both ``include_source_code`` and ``hook_lineage`` 
have **no
+  effect** regardless of their values, because the code paths they control are 
never
+  reached.
+- ``include_source_code`` only applies inside Python and Bash operator 
extractors; setting
+  it to ``false`` on other operator types is a no-op.
+
+Priority for **task events** (most specific tier wins; within a tier, last 
matching
+rule wins):
+
+1. ``dag_id`` + ``task_id``
+2. ``dag_id``
+3. ``operator``
+4. Global (no match keys)
+5. Built-in defaults
+
+For ``emit`` at the task level: ``emit_task_events`` beats ``emit`` within the 
same
+rule.
+
+Priority for **DAG run events**:
+
+1. ``dag_id`` rule (using ``emit_dag_events`` or ``emit``)
+2. Global (no match keys)
+3. Built-in defaults
+
+For ``emit`` at the dag level: ``emit_dag_events`` beats ``emit`` within the 
same
+rule.
+
+Legacy config translation
+-------------------------
+
+Any active legacy config option (``disabled_for_operators``, 
``disable_source_code``,
+``include_full_task_info``, ``selective_enable``) is **always** translated 
into equivalent
+``emission_policy`` rules, regardless of whether ``emission_policy`` itself is 
set. The
+translated rules are prepended before any user-provided rules, so user rules 
win within
+each priority tier (last-wins). A ``DeprecationWarning`` is issued listing 
every translated
+option — silence it by migrating those options into ``emission_policy`` 
exclusively.
+
+When no legacy option is active and ``emission_policy`` is empty, the resolver 
returns
+the built-in defaults with no warnings.
+
+Audit logging
+-------------
+
+Every resolved field that is non-default — whether from a rule or from a legacy
+translation — is logged at INFO level, identifying the field, the event 
context,
+and the exact rule that caused the change.
+"""
+
+from __future__ import annotations
+
+import logging
+import re
+import warnings
+from dataclasses import dataclass, replace
+from typing import TYPE_CHECKING
+
+from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.providers.openlineage import conf as _ol_conf
+
+if TYPE_CHECKING:
+    from airflow.providers.openlineage.utils.utils import AnyOperator
+
+log = logging.getLogger(__name__)
+
+# Sentinel used during field resolution — distinct from ``False``.
+_UNSET = object()
+
+# Control flag names — the keys that may appear inside a rule's ``controls`` 
dict.
+EMIT = "emit"
+EMIT_TASK_EVENTS = "emit_task_events"
+EMIT_DAG_EVENTS = "emit_dag_events"
+EXTRACT_OPERATOR_METADATA = "extract_operator_metadata"
+INCLUDE_SOURCE_CODE = "include_source_code"
+HOOK_LINEAGE = "hook_lineage"
+INCLUDE_FULL_TASK_INFO = "include_full_task_info"
+
+# Scope keys — the keys that may appear inside a rule's ``scope`` dict.
+SCOPE_DAG_ID = "dag_id"
+SCOPE_TASK_ID = "task_id"
+SCOPE_OPERATOR = "operator"
+
+# Top-level rule keys.
+RULE_SCOPE = "scope"
+RULE_CONTROLS = "controls"
+RULE_MATCH_MODE = "match_mode"
+RULE_LOCKED = "locked"
+
+# Param names used by the authoring API to store flow-control flags on 
operator / DAG
+# objects.  Kept in this module (rather than the api module) so the resolver 
can read
+# them without a circular import.
+OL_EMISSION_POLICY_TASK_PARAM = "_ol_emission_policy"
+OL_EMISSION_POLICY_DAG_PARAM = "_ol_emission_policy_dag"
+
+# Schema enforcement: only these keys may appear at each level.  Unknown keys 
cause
+# the rule to be skipped with a WARNING (catches typos like {"scope": 
{"dgg_id": "x"}}).
+_ALLOWED_TOP_LEVEL_KEYS: frozenset[str] = frozenset({RULE_SCOPE, 
RULE_MATCH_MODE, RULE_CONTROLS, RULE_LOCKED})
+_ALLOWED_SCOPE_KEYS: frozenset[str] = frozenset({SCOPE_DAG_ID, SCOPE_TASK_ID, 
SCOPE_OPERATOR})
+_ALLOWED_CONTROL_KEYS: frozenset[str] = frozenset(
+    {
+        EMIT,
+        EMIT_TASK_EVENTS,
+        EMIT_DAG_EVENTS,
+        EXTRACT_OPERATOR_METADATA,
+        INCLUDE_SOURCE_CODE,
+        HOOK_LINEAGE,
+        INCLUDE_FULL_TASK_INFO,
+    }
+)
+
+# Fields scanned for ``locked: true`` on task-event resolution.
+# ``emit`` is intentionally absent: emit-locking is driven by either ``emit`` 
or
+# ``emit_task_events`` appearing in a locked rule (handled separately in
+# :func:`_compute_locked_task_fields`), so listing it here would double-count.
+_LOCKABLE_TASK_FIELDS: tuple[str, ...] = (
+    EXTRACT_OPERATOR_METADATA,
+    INCLUDE_SOURCE_CODE,
+    HOOK_LINEAGE,
+    INCLUDE_FULL_TASK_INFO,
+)
+
+# Authoring flag classification: which keys are relevant for task-level vs 
DAG-run
+# resolution.  Used by the authoring API to split a single call into the two 
stored
+# param dicts.
+_TASK_FLAG_KEYS: frozenset[str] = frozenset(
+    {
+        EMIT,
+        EMIT_TASK_EVENTS,
+        EXTRACT_OPERATOR_METADATA,
+        INCLUDE_SOURCE_CODE,
+        HOOK_LINEAGE,
+        INCLUDE_FULL_TASK_INFO,
+    }
+)
+_DAG_FLAG_KEYS: frozenset[str] = frozenset({EMIT, EMIT_DAG_EVENTS})
+
+# Audit-log direction tables.  Fields default-true → log on transition to 
false;
+# fields default-false → log on transition to true.
+_AUDIT_DISABLE_FIELDS: frozenset[str] = frozenset(
+    {EMIT, EXTRACT_OPERATOR_METADATA, INCLUDE_SOURCE_CODE, HOOK_LINEAGE}
+)
+_AUDIT_ENABLE_FIELDS: frozenset[str] = frozenset({INCLUDE_FULL_TASK_INFO})
+
+
+@dataclass(frozen=True)
+class EmissionPolicy:
+    """Resolved emission policy for a specific event context."""
+
+    emit: bool
+    extract_operator_metadata: bool
+    include_source_code: bool
+    hook_lineage: bool
+    include_full_task_info: bool
+
+    @classmethod
+    def defaults(cls) -> EmissionPolicy:
+        """Return the default policy (all controls at their built-in 
defaults)."""
+        return cls(
+            emit=True,
+            extract_operator_metadata=True,
+            include_source_code=True,
+            hook_lineage=True,
+            include_full_task_info=False,
+        )
+
+
+def _matches(pattern: str, value: str, match_mode: str) -> bool:
+    """Return ``True`` if *value* matches *pattern* according to 
*match_mode*."""
+    if match_mode == "regex":
+        return bool(re.fullmatch(pattern, value))
+    return pattern == value
+
+
+def _read_param(obj: object, param_name: str) -> dict[str, bool]:
+    """Read the flags dict stored at *param_name* on *obj*, or ``{}`` if 
absent."""
+    from airflow.providers.common.compat.sdk import Param
+
+    val = getattr(obj, "params", {}).get(param_name)
+    if val is None:
+        return {}
+    if isinstance(val, Param):
+        val = val.value
+    return val if isinstance(val, dict) else {}
+
+
+def _merge_param(obj: object, param_name: str, new_flags: dict[str, bool]) -> 
None:
+    """
+    Merge *new_flags* into the ``param_name`` param on *obj*, creating it if 
absent.
+
+    The public entry point
+    
(:func:`~airflow.providers.openlineage.api.emission_policy.extend_global_openlineage_emission_policy`)
+    validates that *obj* is a DAG or task-like object before reaching here, so 
a missing
+    ``params`` attribute indicates a programmer error (e.g. an 
incorrectly-mocked object
+    in tests) — we raise loud rather than silently dropping flags.
+    """
+    from airflow.providers.common.compat.sdk import Param
+
+    params = getattr(obj, "params", None)
+    if params is None:
+        raise TypeError(
+            f"extend_global_openlineage_emission_policy: {type(obj).__name__} 
instance has no "
+            "'params' attribute — cannot store flow-control flags. "
+            "Pass a DAG, an Operator, or an XComArg."
+        )
+    existing = params.get(param_name)
+    if existing is not None:
+        existing_val = existing.value if isinstance(existing, Param) else 
existing
+        if isinstance(existing_val, dict):
+            new_flags = {**existing_val, **new_flags}
+    params[param_name] = Param(new_flags)
+
+
+def _log_disabled(field: str, context: str, rule: dict | None) -> None:

Review Comment:
   _log_disabled(field, context, rule) has an else branch (when rule is None) 
logging "'%s' suppressed for %s — selective_enable opt-in not detected". Every 
call site only invokes it for a field whose built-in default is the "enabled" 
direction and only when the value is
     non-default — meaning a matching rule always exists, so rule is never 
None. The branch is dead, and its message references selective_enable semantics 
this generic resolver no longer implements. A maintainer will believe a code 
path emits that message; none does. Either remove the
     branch (make rule non-optional) or replace the wording with a generic 
defensive-fallback note.



##########
providers/openlineage/src/airflow/providers/openlineage/utils/emission_policy.py:
##########
@@ -0,0 +1,1066 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Per-scope emission policy resolution for the OpenLineage provider.
+
+The ``emission_policy`` Airflow configuration option (``[openlineage]`` 
section)
+accepts a JSON array of rule objects.  Each rule has the shape::
+
+    {
+        "scope":      {<scope keys>},        # required, may be empty for 
"global"
+        "match_mode": "exact" | "regex",     # optional, default "exact"
+        "controls":   {<control flags>},     # required, must be non-empty
+        "locked":     true | false           # optional, default false
+    }
+
+Top-level keys outside the four above are rejected with a WARNING and the rule
+is skipped.
+
+Scope keys (all optional inside ``scope``):
+
+- ``operator`` — fully-qualified operator class name; matches task events for 
that
+  operator type.
+- ``dag_id`` — matches task and/or dag run events for all tasks / dag runs 
belonging
+  to the named DAG.  Use ``emit_task_events`` / ``emit_dag_events`` to target 
one
+  event type selectively.
+- ``task_id`` — only valid alongside ``dag_id``; targets a specific task.
+- *(empty ``scope: {}``)* — global default override; applies to every task and 
DAG event.
+
+``operator`` cannot be combined with ``dag_id`` or ``task_id`` (scope is one 
of:
+global, operator-only, dag-only, dag+task).  ``task_id`` without ``dag_id`` and
+``operator`` combined with ``emit_dag_events`` (or ``task_id`` combined with
+``emit_dag_events``) are also rejected with a WARNING.
+
+``match_mode`` lives at the top level: ``"exact"`` (default) or ``"regex"``.
+When set to ``"regex"``, every value inside ``scope`` (``dag_id``, ``task_id``,
+``operator``) is treated as a ``re.fullmatch`` pattern.
+
+Control flag keys (all optional inside ``controls``; the dict must be 
non-empty):
+
+- ``emit`` — shorthand: disable ALL OpenLineage events in scope (both task and 
dag
+  run events).  Default: ``true``.
+- ``emit_task_events`` — disable task-level events only; takes precedence over
+  ``emit`` for task event decisions.  Default: ``true``.
+- ``emit_dag_events`` — disable dag-run-level events only; takes precedence 
over
+  ``emit`` for dag event decisions.  Default: ``true``.
+- ``extract_operator_metadata`` — whether to run operator-specific 
extractor-based metadata
+  collection.  When ``true``, the extractor manager calls the operator's 
OpenLineage extractor
+  (if registered), which may produce dataset inputs/outputs, job facets, run 
facets, and
+  other operator-specific metadata.  When ``false``, the entire extraction 
pipeline is skipped
+  and a minimal event is emitted.  Only meaningful for task events.  Default: 
``true``.
+- ``include_source_code`` — whether to include operator source code in the
+  ``SourceCodeJobFacet`` for Python and Bash operators.  Only meaningful for 
task events
+  when ``extract_operator_metadata`` is also ``true``.  Default: ``true``.
+- ``hook_lineage`` — whether to use ``HookLineageCollector`` as a fallback 
when the
+  extractor finds no inputs/outputs.  **Has no effect when 
``extract_operator_metadata``
+  is ``false``** — the entire extraction pipeline (including hook lineage) is 
skipped.
+  Only meaningful for task events.  Default: ``true``.
+- ``include_full_task_info`` — whether to include the full serialised operator 
state
+  in the ``AirflowRunFacet``.  When ``false`` (default), only a curated subset 
of
+  task attributes is sent.  Only meaningful for task events.  Default: 
``false``.
+
+``locked: true`` is an admin floor lock that prevents per-Dag / per-task 
authoring
+overrides from changing the field(s) carried by this rule's ``controls`` dict.
+
+Flag hierarchy
+~~~~~~~~~~~~~~
+
+Flags are not fully independent — some only take effect when a higher-level 
flag is
+enabled:
+
+- ``extract_operator_metadata: false`` skips the **entire** operator 
extraction pipeline.
+  When this is ``false``, both ``include_source_code`` and ``hook_lineage`` 
have **no
+  effect** regardless of their values, because the code paths they control are 
never
+  reached.
+- ``include_source_code`` only applies inside Python and Bash operator 
extractors; setting
+  it to ``false`` on other operator types is a no-op.
+
+Priority for **task events** (most specific tier wins; within a tier, last 
matching
+rule wins):
+
+1. ``dag_id`` + ``task_id``
+2. ``dag_id``
+3. ``operator``
+4. Global (no match keys)
+5. Built-in defaults
+
+For ``emit`` at the task level: ``emit_task_events`` beats ``emit`` within the 
same
+rule.
+
+Priority for **DAG run events**:
+
+1. ``dag_id`` rule (using ``emit_dag_events`` or ``emit``)
+2. Global (no match keys)
+3. Built-in defaults
+
+For ``emit`` at the dag level: ``emit_dag_events`` beats ``emit`` within the 
same
+rule.
+
+Legacy config translation
+-------------------------
+
+Any active legacy config option (``disabled_for_operators``, 
``disable_source_code``,
+``include_full_task_info``, ``selective_enable``) is **always** translated 
into equivalent
+``emission_policy`` rules, regardless of whether ``emission_policy`` itself is 
set. The
+translated rules are prepended before any user-provided rules, so user rules 
win within
+each priority tier (last-wins). A ``DeprecationWarning`` is issued listing 
every translated
+option — silence it by migrating those options into ``emission_policy`` 
exclusively.
+
+When no legacy option is active and ``emission_policy`` is empty, the resolver 
returns
+the built-in defaults with no warnings.
+
+Audit logging
+-------------
+
+Every resolved field that is non-default — whether from a rule or from a legacy
+translation — is logged at INFO level, identifying the field, the event 
context,
+and the exact rule that caused the change.
+"""
+
+from __future__ import annotations
+
+import logging
+import re
+import warnings
+from dataclasses import dataclass, replace
+from typing import TYPE_CHECKING
+
+from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.providers.openlineage import conf as _ol_conf
+
+if TYPE_CHECKING:
+    from airflow.providers.openlineage.utils.utils import AnyOperator
+
+log = logging.getLogger(__name__)
+
+# Sentinel used during field resolution — distinct from ``False``.
+_UNSET = object()
+
+# Control flag names — the keys that may appear inside a rule's ``controls`` 
dict.
+EMIT = "emit"
+EMIT_TASK_EVENTS = "emit_task_events"
+EMIT_DAG_EVENTS = "emit_dag_events"
+EXTRACT_OPERATOR_METADATA = "extract_operator_metadata"
+INCLUDE_SOURCE_CODE = "include_source_code"
+HOOK_LINEAGE = "hook_lineage"
+INCLUDE_FULL_TASK_INFO = "include_full_task_info"
+
+# Scope keys — the keys that may appear inside a rule's ``scope`` dict.
+SCOPE_DAG_ID = "dag_id"
+SCOPE_TASK_ID = "task_id"
+SCOPE_OPERATOR = "operator"
+
+# Top-level rule keys.
+RULE_SCOPE = "scope"
+RULE_CONTROLS = "controls"
+RULE_MATCH_MODE = "match_mode"
+RULE_LOCKED = "locked"
+
+# Param names used by the authoring API to store flow-control flags on 
operator / DAG
+# objects.  Kept in this module (rather than the api module) so the resolver 
can read
+# them without a circular import.
+OL_EMISSION_POLICY_TASK_PARAM = "_ol_emission_policy"
+OL_EMISSION_POLICY_DAG_PARAM = "_ol_emission_policy_dag"
+
+# Schema enforcement: only these keys may appear at each level.  Unknown keys 
cause
+# the rule to be skipped with a WARNING (catches typos like {"scope": 
{"dgg_id": "x"}}).
+_ALLOWED_TOP_LEVEL_KEYS: frozenset[str] = frozenset({RULE_SCOPE, 
RULE_MATCH_MODE, RULE_CONTROLS, RULE_LOCKED})
+_ALLOWED_SCOPE_KEYS: frozenset[str] = frozenset({SCOPE_DAG_ID, SCOPE_TASK_ID, 
SCOPE_OPERATOR})
+_ALLOWED_CONTROL_KEYS: frozenset[str] = frozenset(
+    {
+        EMIT,
+        EMIT_TASK_EVENTS,
+        EMIT_DAG_EVENTS,
+        EXTRACT_OPERATOR_METADATA,
+        INCLUDE_SOURCE_CODE,
+        HOOK_LINEAGE,
+        INCLUDE_FULL_TASK_INFO,
+    }
+)
+
+# Fields scanned for ``locked: true`` on task-event resolution.
+# ``emit`` is intentionally absent: emit-locking is driven by either ``emit`` 
or
+# ``emit_task_events`` appearing in a locked rule (handled separately in
+# :func:`_compute_locked_task_fields`), so listing it here would double-count.
+_LOCKABLE_TASK_FIELDS: tuple[str, ...] = (
+    EXTRACT_OPERATOR_METADATA,
+    INCLUDE_SOURCE_CODE,
+    HOOK_LINEAGE,
+    INCLUDE_FULL_TASK_INFO,
+)
+
+# Authoring flag classification: which keys are relevant for task-level vs 
DAG-run
+# resolution.  Used by the authoring API to split a single call into the two 
stored
+# param dicts.
+_TASK_FLAG_KEYS: frozenset[str] = frozenset(
+    {
+        EMIT,
+        EMIT_TASK_EVENTS,
+        EXTRACT_OPERATOR_METADATA,
+        INCLUDE_SOURCE_CODE,
+        HOOK_LINEAGE,
+        INCLUDE_FULL_TASK_INFO,
+    }
+)
+_DAG_FLAG_KEYS: frozenset[str] = frozenset({EMIT, EMIT_DAG_EVENTS})
+
+# Audit-log direction tables.  Fields default-true → log on transition to 
false;
+# fields default-false → log on transition to true.
+_AUDIT_DISABLE_FIELDS: frozenset[str] = frozenset(
+    {EMIT, EXTRACT_OPERATOR_METADATA, INCLUDE_SOURCE_CODE, HOOK_LINEAGE}
+)
+_AUDIT_ENABLE_FIELDS: frozenset[str] = frozenset({INCLUDE_FULL_TASK_INFO})
+
+
+@dataclass(frozen=True)
+class EmissionPolicy:
+    """Resolved emission policy for a specific event context."""
+
+    emit: bool
+    extract_operator_metadata: bool
+    include_source_code: bool
+    hook_lineage: bool
+    include_full_task_info: bool
+
+    @classmethod
+    def defaults(cls) -> EmissionPolicy:
+        """Return the default policy (all controls at their built-in 
defaults)."""
+        return cls(
+            emit=True,
+            extract_operator_metadata=True,
+            include_source_code=True,
+            hook_lineage=True,
+            include_full_task_info=False,
+        )
+
+
+def _matches(pattern: str, value: str, match_mode: str) -> bool:
+    """Return ``True`` if *value* matches *pattern* according to 
*match_mode*."""
+    if match_mode == "regex":
+        return bool(re.fullmatch(pattern, value))
+    return pattern == value
+
+
+def _read_param(obj: object, param_name: str) -> dict[str, bool]:
+    """Read the flags dict stored at *param_name* on *obj*, or ``{}`` if 
absent."""
+    from airflow.providers.common.compat.sdk import Param
+
+    val = getattr(obj, "params", {}).get(param_name)
+    if val is None:
+        return {}
+    if isinstance(val, Param):
+        val = val.value
+    return val if isinstance(val, dict) else {}
+
+
+def _merge_param(obj: object, param_name: str, new_flags: dict[str, bool]) -> 
None:
+    """
+    Merge *new_flags* into the ``param_name`` param on *obj*, creating it if 
absent.
+
+    The public entry point
+    
(:func:`~airflow.providers.openlineage.api.emission_policy.extend_global_openlineage_emission_policy`)
+    validates that *obj* is a DAG or task-like object before reaching here, so 
a missing
+    ``params`` attribute indicates a programmer error (e.g. an 
incorrectly-mocked object
+    in tests) — we raise loud rather than silently dropping flags.
+    """
+    from airflow.providers.common.compat.sdk import Param
+
+    params = getattr(obj, "params", None)
+    if params is None:
+        raise TypeError(
+            f"extend_global_openlineage_emission_policy: {type(obj).__name__} 
instance has no "
+            "'params' attribute — cannot store flow-control flags. "
+            "Pass a DAG, an Operator, or an XComArg."
+        )
+    existing = params.get(param_name)
+    if existing is not None:
+        existing_val = existing.value if isinstance(existing, Param) else 
existing
+        if isinstance(existing_val, dict):
+            new_flags = {**existing_val, **new_flags}
+    params[param_name] = Param(new_flags)
+
+
+def _log_disabled(field: str, context: str, rule: dict | None) -> None:
+    """Log an INFO-level audit message when a control field is non-default."""
+    if rule is not None:
+        log.info(
+            "emission_policy: '%s' disabled for %s — matched by rule: %r",
+            field,
+            context,
+            rule,
+        )
+    else:
+        log.info(
+            "emission_policy: '%s' suppressed for %s — selective_enable opt-in 
not detected",
+            field,
+            context,
+        )
+
+
+def _log_enabled(field: str, context: str, rule: dict | None) -> None:
+    """Log an INFO-level audit message when a normally-false field is enabled 
by a rule."""
+    if rule is not None:
+        log.info(
+            "emission_policy: '%s' enabled for %s — matched by rule: %r",
+            field,
+            context,
+            rule,
+        )
+
+
+def _audit_log_updates(updates: dict[str, bool], context: str, source: dict) 
-> None:
+    """Audit-log every non-default value introduced by *updates* in one 
pass."""
+    for field, value in updates.items():
+        if field in _AUDIT_DISABLE_FIELDS and not value:
+            _log_disabled(field, context, source)
+        elif field in _AUDIT_ENABLE_FIELDS and value:
+            _log_enabled(field, context, source)
+
+
+def _validate_rule(rule: dict) -> bool:
+    """
+    Return ``True`` if the rule matches the nested schema; warn and return 
``False`` otherwise.
+
+    A valid rule has exactly these top-level keys: ``scope`` (dict, possibly 
empty),
+    ``controls`` (non-empty dict), and the optionals ``match_mode`` / 
``locked``.
+    Unknown keys at any level cause the rule to be skipped — that catches user
+    typos like ``{"scope": {"dgg_id": "x"}}``.
+    """
+    unknown_top = set(rule) - _ALLOWED_TOP_LEVEL_KEYS
+    if unknown_top:
+        log.warning(
+            "emission_policy rule has unknown top-level key(s) %s (allowed: 
%s); ignoring: %r",
+            sorted(unknown_top),
+            sorted(_ALLOWED_TOP_LEVEL_KEYS),
+            rule,
+        )
+        return False
+
+    if RULE_SCOPE not in rule:
+        log.warning(
+            "emission_policy rule is missing required 'scope' key (use 
'scope': {} for global); ignoring: %r",
+            rule,
+        )
+        return False
+    scope = rule[RULE_SCOPE]
+    if not isinstance(scope, dict):
+        log.warning(
+            "emission_policy rule 'scope' must be a dict (use {} for global), 
got %r; ignoring: %r",
+            type(scope).__name__,
+            rule,
+        )
+        return False
+    unknown_scope = set(scope) - _ALLOWED_SCOPE_KEYS
+    if unknown_scope:
+        log.warning(
+            "emission_policy rule 'scope' has unknown key(s) %s (allowed: %s); 
ignoring: %r",
+            sorted(unknown_scope),
+            sorted(_ALLOWED_SCOPE_KEYS),
+            rule,
+        )
+        return False
+    has_dag = SCOPE_DAG_ID in scope
+    has_task = SCOPE_TASK_ID in scope
+    has_op = SCOPE_OPERATOR in scope
+    if has_task and not has_dag:
+        log.warning(
+            "emission_policy rule scope has 'task_id' without 'dag_id'; 
ignoring: %r",
+            rule,
+        )
+        return False
+    if has_op and (has_dag or has_task):
+        log.warning(
+            "emission_policy rule scope combines 'operator' with 
'dag_id'/'task_id' "
+            "(must be exactly one of: global, operator-only, dag-only, 
dag+task); ignoring: %r",
+            rule,
+        )
+        return False
+
+    if RULE_CONTROLS not in rule:
+        log.warning(
+            "emission_policy rule is missing required 'controls' dict; 
ignoring: %r",
+            rule,
+        )
+        return False
+    controls = rule[RULE_CONTROLS]
+    if not isinstance(controls, dict):
+        log.warning(
+            "emission_policy rule 'controls' must be a dict, got %r; ignoring: 
%r",
+            type(controls).__name__,
+            rule,
+        )
+        return False
+    if not controls:
+        log.warning(
+            "emission_policy rule has empty 'controls' dict "
+            "(at least one control flag is required); ignoring: %r",
+            rule,
+        )
+        return False
+    unknown_controls = set(controls) - _ALLOWED_CONTROL_KEYS
+    if unknown_controls:
+        log.warning(
+            "emission_policy rule 'controls' has unknown key(s) %s (allowed: 
%s); ignoring: %r",
+            sorted(unknown_controls),
+            sorted(_ALLOWED_CONTROL_KEYS),
+            rule,
+        )
+        return False
+    for k, v in controls.items():
+        if not isinstance(v, bool):
+            log.warning(
+                "emission_policy rule 'controls.%s' must be bool, got %r; 
ignoring: %r",
+                k,
+                v,
+                rule,
+            )
+            return False
+
+    if has_op and EMIT_DAG_EVENTS in controls:
+        log.warning(
+            "emission_policy rule has scope 'operator' with controls 
'emit_dag_events' which is meaningless "
+            "(operators are not associated with DAG run events); ignoring: %r",
+            rule,
+        )
+        return False
+    if has_task and EMIT_DAG_EVENTS in controls:
+        log.warning(
+            "emission_policy rule has scope 'task_id' with controls 
'emit_dag_events' which is meaningless "
+            "(task-specific rules do not target DAG run events); ignoring: %r",
+            rule,
+        )
+        return False
+
+    match_mode = rule.get(RULE_MATCH_MODE, "exact")
+    if match_mode not in ("exact", "regex"):
+        log.warning(
+            "emission_policy rule has invalid 'match_mode' %r (must be 'exact' 
or 'regex'); ignoring: %r",
+            match_mode,
+            rule,
+        )
+        return False
+    if match_mode == "regex":
+        for key in (SCOPE_DAG_ID, SCOPE_TASK_ID, SCOPE_OPERATOR):
+            if key in scope:
+                try:
+                    re.compile(scope[key])
+                except re.error as exc:
+                    log.warning(
+                        "emission_policy rule scope.'%s' pattern %r is not a 
valid regex (%s); ignoring: %r",
+                        key,
+                        scope[key],
+                        exc,
+                        rule,
+                    )
+                    return False
+
+    if RULE_LOCKED in rule and not isinstance(rule[RULE_LOCKED], bool):
+        log.warning(
+            "emission_policy rule 'locked' must be bool, got %r; ignoring: %r",
+            rule[RULE_LOCKED],
+            rule,
+        )
+        return False
+    # Under the nested schema every allowed control key is lockable 
(validation already
+    # requires non-empty controls with keys from the allowed set), so there is 
no
+    # "lock with no lockable field" warning case to raise.
+
+    return True
+
+
+def _classify_rules(
+    rules: list[dict],
+    fqcn: str,
+    dag_id: str,
+    task_id: str,
+) -> tuple[list[dict], list[dict], list[dict], list[dict]]:
+    """
+    Classify a flat list of validated rules into the four priority tiers for 
task resolution.
+
+    Returns ``(task_rules, dag_rules, operator_rules, global_rules)``.
+    """
+    task_rules: list[dict] = []
+    dag_rules: list[dict] = []
+    operator_rules: list[dict] = []
+    global_rules: list[dict] = []
+
+    for rule in rules:
+        if not isinstance(rule, dict):
+            log.warning("emission_policy entry is not a dict: %r; ignoring.", 
rule)
+            continue
+        if not _validate_rule(rule):
+            continue
+
+        scope = rule[RULE_SCOPE]
+        match_mode = rule.get(RULE_MATCH_MODE, "exact")
+        has_dag = SCOPE_DAG_ID in scope
+        has_task = SCOPE_TASK_ID in scope
+        has_op = SCOPE_OPERATOR in scope
+
+        if has_dag and has_task:
+            if _matches(scope[SCOPE_DAG_ID], dag_id, match_mode) and _matches(
+                scope[SCOPE_TASK_ID], task_id, match_mode
+            ):
+                task_rules.append(rule)
+        elif has_dag:
+            if _matches(scope[SCOPE_DAG_ID], dag_id, match_mode):
+                dag_rules.append(rule)
+        elif has_op:
+            if _matches(scope[SCOPE_OPERATOR], fqcn, match_mode):
+                operator_rules.append(rule)
+        else:
+            global_rules.append(rule)
+
+    return task_rules, dag_rules, operator_rules, global_rules
+
+
+def _classify_dag_rules(
+    rules: list[dict],
+    dag_id: str,
+) -> tuple[list[dict], list[dict]]:
+    """
+    Classify a flat list of validated rules into the two priority tiers for 
dag event resolution.
+
+    Only rules with empty ``scope`` (global) and rules with exactly 
``{dag_id}``
+    contribute to dag-event resolution. Operator-scoped and task-scoped rules
+    never affect dag-run events.
+
+    Returns ``(dag_rules, global_rules)``.
+    """
+    dag_rules: list[dict] = []
+    global_rules: list[dict] = []
+
+    for rule in rules:
+        if not isinstance(rule, dict):
+            continue
+        if not _validate_rule(rule):
+            continue
+
+        scope = rule[RULE_SCOPE]
+        match_mode = rule.get(RULE_MATCH_MODE, "exact")
+        has_dag = SCOPE_DAG_ID in scope
+        has_task = SCOPE_TASK_ID in scope
+        has_op = SCOPE_OPERATOR in scope
+
+        if has_dag and not has_task and not has_op:
+            if _matches(scope[SCOPE_DAG_ID], dag_id, match_mode):
+                dag_rules.append(rule)
+        elif not (has_dag or has_task or has_op):  # global = empty scope dict
+            global_rules.append(rule)
+
+    return dag_rules, global_rules
+
+
+def _resolve_field_with_source(
+    tiers: list[list[dict]], field: str, default: bool
+) -> tuple[bool, dict | None]:
+    """
+    Walk priority tiers from most specific to least specific.
+
+    Within a tier the *last* rule that sets ``field`` (inside its ``controls`` 
dict)
+    wins. Returns ``(default, None)`` if no rule in any tier specifies the 
field;
+    the second element is the rule dict that provided the winning value, or
+    ``None`` when the built-in default was used.
+
+    A WARNING is logged when two rules in the *same* tier set the same field to
+    *different* values — the later rule still wins, but the conflict is 
surfaced
+    so operators can spot accidental contradictions.
+    """
+    for tier_rules in tiers:
+        value = _UNSET
+        winning_rule: dict | None = None
+        for rule in tier_rules:
+            controls = rule[RULE_CONTROLS]
+            if field in controls:
+                v = controls[field]
+                if value is not _UNSET and value != v:
+                    log.warning(
+                        "emission_policy: field '%s' has contradictory values 
in the same "
+                        "priority tier (using last value %r); conflicting 
rules: %r and %r",
+                        field,
+                        v,
+                        winning_rule,
+                        rule,
+                    )
+                value = v
+                winning_rule = rule
+        if value is not _UNSET:
+            return value, winning_rule  # type: ignore[return-value]
+    return default, None
+
+
+def _resolve_emit_with_source(
+    tiers: list[list[dict]],
+    scope: str,
+    default: bool,
+) -> tuple[bool, dict | None]:
+    """
+    Resolve the effective ``emit`` decision for the given *scope*.
+
+    ``scope`` must be ``"task"`` or ``"dag"``.  Within each rule, the
+    scope-specific key (``emit_task_events`` / ``emit_dag_events``) takes
+    precedence over the generic ``emit`` shorthand.
+    """
+    specific_key = EMIT_TASK_EVENTS if scope == "task" else EMIT_DAG_EVENTS
+    for tier_rules in tiers:
+        value = _UNSET
+        winning_rule: dict | None = None
+        for rule in tier_rules:
+            controls = rule[RULE_CONTROLS]
+            new_v: bool | None = None
+            if specific_key in controls:
+                new_v = controls[specific_key]
+            elif EMIT in controls:
+                new_v = controls[EMIT]
+            if new_v is not None:
+                if value is not _UNSET and value != new_v:
+                    log.warning(
+                        "emission_policy: field 'emit' (%s) has contradictory 
values in the "
+                        "same priority tier (using last value %r); conflicting 
rules: %r and %r",
+                        scope,
+                        new_v,
+                        winning_rule,
+                        rule,
+                    )
+                value = new_v
+                winning_rule = rule
+        if value is not _UNSET:
+            return value, winning_rule  # type: ignore[return-value]
+    return default, None
+
+
+def _synthesize_legacy_rules_task(
+    operator: AnyOperator,
+    dag_id: str,
+    task_id: str,
+) -> list[dict]:
+    """
+    Translate legacy config options into equivalent ``emission_policy`` rules 
for task resolution.
+
+    The returned rules are intended to be **prepended** to the user-provided 
rules so that
+    user rules (appended later) win within each tier (last-wins within a tier).
+    """
+    from airflow.providers.openlineage import conf
+
+    rules: list[dict] = []
+
+    # disabled_for_operators → operator-scope emit:false rules
+    for fqcn in conf.disabled_operators():
+        rules.append({RULE_SCOPE: {SCOPE_OPERATOR: fqcn}, RULE_CONTROLS: 
{EMIT: False}})
+
+    # disable_source_code → global include_source_code:false
+    if not conf.is_source_enabled():
+        rules.append({RULE_SCOPE: {}, RULE_CONTROLS: {INCLUDE_SOURCE_CODE: 
False}})
+
+    # include_full_task_info → global include_full_task_info:true (only when 
non-default True)
+    if conf.include_full_task_info():
+        rules.append({RULE_SCOPE: {}, RULE_CONTROLS: {INCLUDE_FULL_TASK_INFO: 
True}})
+
+    # selective_enable → global emit:false baseline + per-task opt-in rule
+    if conf.selective_enable():
+        rules.append({RULE_SCOPE: {}, RULE_CONTROLS: {EMIT: False}})  # global 
baseline
+        try:
+            from airflow.providers.openlineage.utils.selective_enable import 
is_task_lineage_enabled
+
+            if is_task_lineage_enabled(operator):
+                # Task is explicitly opted in → task-tier rule that overrides 
the global baseline.
+                rules.append(
+                    {
+                        RULE_SCOPE: {SCOPE_DAG_ID: dag_id, SCOPE_TASK_ID: 
task_id},
+                        RULE_CONTROLS: {EMIT: True},
+                    }
+                )
+        except Exception:
+            # Non-standard operator types may not expose params correctly. 
Surface at debug
+            # level so the failure is discoverable without spamming production 
logs.
+            log.debug(
+                "selective_enable translation: is_task_lineage_enabled() 
failed for task '%s'",
+                task_id,
+                exc_info=True,
+            )
+
+    return rules
+
+
+def _synthesize_legacy_rules_dag(
+    dag_id: str,
+    dag: object | None,
+) -> list[dict]:
+    """Translate legacy config options into equivalent ``emission_policy`` 
rules for dag event resolution."""
+    from airflow.providers.openlineage import conf
+
+    rules: list[dict] = []
+
+    if conf.selective_enable() and dag is not None:
+        rules.append({RULE_SCOPE: {}, RULE_CONTROLS: {EMIT: False}})  # global 
baseline
+        try:
+            from airflow.providers.openlineage.utils.selective_enable import 
is_dag_lineage_enabled
+
+            if is_dag_lineage_enabled(dag):  # type: ignore[arg-type]
+                rules.append({RULE_SCOPE: {SCOPE_DAG_ID: dag_id}, 
RULE_CONTROLS: {EMIT: True}})
+        except Exception:
+            log.debug(
+                "selective_enable translation: is_dag_lineage_enabled() failed 
for dag '%s'",
+                dag_id,
+                exc_info=True,
+            )
+
+    return rules
+
+
+def _warn_legacy_with_emission_policy(legacy_rules: list[dict], scope: str) -> 
None:
+    """
+    Issue a DeprecationWarning listing legacy options that were translated to 
rules.
+
+    *scope* is ``"task"`` or ``"dag"``: a task-scope warning enumerates every 
legacy
+    option (``disabled_for_operators``, ``disable_source_code``,
+    ``include_full_task_info``, ``selective_enable``); a dag-scope warning only
+    mentions ``selective_enable`` because the other three are task-only.
+    """
+    if not legacy_rules:
+        return
+
+    from airflow.providers.openlineage import conf
+
+    parts: list[str] = []
+    if scope == "task":
+        if conf.disabled_operators():
+            translated = [
+                {RULE_SCOPE: {SCOPE_OPERATOR: f}, RULE_CONTROLS: {EMIT: False}}
+                for f in conf.disabled_operators()
+            ]
+            parts.append(f"  - disabled_for_operators → {translated}")
+        if not conf.is_source_enabled():
+            parts.append(
+                '  - disable_source_code → [{"scope": {}, "controls": 
{"include_source_code": false}}]'
+            )
+        if conf.include_full_task_info():
+            parts.append(
+                '  - include_full_task_info → [{"scope": {}, "controls": 
{"include_full_task_info": true}}]'
+            )
+        if conf.selective_enable():
+            parts.append(
+                "  - selective_enable → "
+                '[{"scope": {}, "controls": {"emit": false}}] (global 
baseline) '
+                "+ per-task opt-in rules injected at runtime from 
enable_lineage()/disable_lineage()"
+            )
+    else:  # dag scope: only selective_enable contributes legacy dag-event 
rules
+        if not conf.selective_enable():
+            return
+        parts.append(
+            "  - selective_enable → "
+            '[{"scope": {}, "controls": {"emit": false}}] (global baseline) '
+            "+ dag opt-in rules injected at runtime from 
enable_lineage()/disable_lineage()"
+        )
+
+    if not parts:
+        return
+
+    warnings.warn(
+        "[openlineage] One or more legacy config options are set and have been 
translated\n"
+        "into equivalent 'emission_policy' rules:\n" + "\n".join(parts) + "\n"
+        "Migrate to 'emission_policy' exclusively to silence this warning. "
+        "These legacy options will be removed in a future version.",
+        AirflowProviderDeprecationWarning,
+        stacklevel=4,
+    )
+
+
+def _compute_locked_task_fields(
+    task_rules: list[dict],
+    dag_rules: list[dict],
+    operator_rules: list[dict],
+    global_rules: list[dict],
+) -> frozenset[str]:
+    """
+    Compute the set of task-event fields that are locked by *any* matching 
conf rule.
+
+    **Floor-lock semantics**: a field is locked if *any* conf rule that 
matches the
+    current resolution context carries ``locked: true`` for that field — 
regardless
+    of whether that rule is the *winning* rule for the field.  This means a 
global
+    locked rule cannot be silently bypassed by a more-specific (but unlocked) 
conf
+    rule or by per-task authoring flags.
+
+    ``emit_task_events`` and ``emit`` both map to the ``"emit"`` field in
+    :class:`EmissionPolicy` for task-event decisions.
+    """
+    _locked: set[str] = set()
+    for rule in task_rules + dag_rules + operator_rules + global_rules:
+        if not rule.get(RULE_LOCKED, False):
+            continue
+        controls = rule[RULE_CONTROLS]
+        if EMIT_TASK_EVENTS in controls or EMIT in controls:
+            _locked.add(EMIT)
+        for field in _LOCKABLE_TASK_FIELDS:
+            if field in controls:
+                _locked.add(field)
+    return frozenset(_locked)
+
+
+def _apply_task_authoring(
+    config: EmissionPolicy,
+    locked_fields: frozenset[str],
+    operator: AnyOperator,
+    context: str,
+) -> EmissionPolicy:
+    """
+    Apply per-task authoring flags on top of *config*.
+
+    Authoring flags are stored on operator objects by
+    
:func:`~airflow.providers.openlineage.api.emission_policy.extend_global_openlineage_emission_policy`.
+    Fields listed in *locked_fields* are protected — attempts to override them 
are
+    logged at INFO level and silently ignored.  ``emit_task_events`` in the 
authoring
+    flags takes precedence over ``emit`` for the task ``emit`` field, 
mirroring the
+    same precedence used in conf rule resolution.
+    """
+    flags = _read_param(operator, OL_EMISSION_POLICY_TASK_PARAM)
+    if not flags:
+        return config
+
+    updates: dict[str, bool] = {}
+
+    effective_emit: bool | None = flags.get(EMIT_TASK_EVENTS, flags.get(EMIT))
+    if effective_emit is not None:
+        if EMIT in locked_fields:
+            log.info(
+                "emission_policy: authoring 'emit' override ignored for %s — 
conf rule is locked",
+                context,
+            )
+        else:
+            updates[EMIT] = effective_emit
+
+    for field in (EXTRACT_OPERATOR_METADATA, INCLUDE_SOURCE_CODE, 
HOOK_LINEAGE, INCLUDE_FULL_TASK_INFO):
+        if field not in flags:
+            continue
+        if field in locked_fields:
+            log.info(
+                "emission_policy: authoring '%s' override ignored for %s — 
conf rule is locked",
+                field,
+                context,
+            )
+        else:
+            updates[field] = flags[field]
+
+    if not updates:
+        return config
+
+    _audit_log_updates(updates, context, source={"_authoring": 
"extend_global_openlineage_emission_policy"})
+    return replace(config, **updates)
+
+
+def _apply_dag_authoring(
+    config: EmissionPolicy,
+    locked_fields: frozenset[str],
+    dag: object,
+    context: str,
+) -> EmissionPolicy:
+    """
+    Apply per-DAG authoring flags on top of *config* for dag-run events.
+
+    ``emit_dag_events`` in the authoring flags takes precedence over ``emit``,
+    mirroring conf rule resolution.
+    """
+    flags = _read_param(dag, OL_EMISSION_POLICY_DAG_PARAM)
+    if not flags:
+        return config
+
+    updates: dict[str, bool] = {}
+
+    effective_emit: bool | None = flags.get(EMIT_DAG_EVENTS, flags.get(EMIT))
+    if effective_emit is not None:
+        if EMIT in locked_fields:
+            log.info(
+                "emission_policy: authoring 'emit' (dag event) override 
ignored for %s — conf rule is locked",
+                context,
+            )
+        else:
+            updates[EMIT] = effective_emit
+
+    if not updates:
+        return config
+
+    _audit_log_updates(updates, context, source={"_authoring": 
"extend_global_openlineage_emission_policy"})
+    return replace(config, **updates)
+
+
+def _resolve_conf_pure_task(
+    rules: list[dict],
+    fqcn: str,
+    dag_id: str,
+    task_id: str,
+) -> tuple[EmissionPolicy, frozenset[str]]:
+    """
+    Pure, deterministic conf resolution for task events.
+
+    Takes the already-parsed *rules* list and a pre-computed *fqcn* string.
+    ``emission_policy()`` is cached at the conf layer, so calling this directly
+    is cheap without any additional memoisation.
+    """
+    task_rules, dag_rules, operator_rules, global_rules = 
_classify_rules(rules, fqcn, dag_id, task_id)
+
+    defaults = EmissionPolicy.defaults()
+    tiers = [task_rules, dag_rules, operator_rules, global_rules]
+    context = f"task '{task_id}' in dag '{dag_id}'"
+
+    emit, emit_rule = _resolve_emit_with_source(tiers, "task", defaults.emit)
+    extract_operator_metadata, em_rule = _resolve_field_with_source(
+        tiers, EXTRACT_OPERATOR_METADATA, defaults.extract_operator_metadata
+    )
+    include_source_code, isc_rule = _resolve_field_with_source(
+        tiers, INCLUDE_SOURCE_CODE, defaults.include_source_code
+    )
+    hook_lineage, hl_rule = _resolve_field_with_source(tiers, HOOK_LINEAGE, 
defaults.hook_lineage)
+    include_full_task_info, ift_rule = _resolve_field_with_source(
+        tiers, INCLUDE_FULL_TASK_INFO, defaults.include_full_task_info
+    )
+
+    if not emit:
+        _log_disabled(EMIT, context, emit_rule)
+    if not extract_operator_metadata:
+        _log_disabled(EXTRACT_OPERATOR_METADATA, context, em_rule)
+    if not include_source_code:
+        _log_disabled(INCLUDE_SOURCE_CODE, context, isc_rule)
+    if not hook_lineage:
+        _log_disabled(HOOK_LINEAGE, context, hl_rule)
+    if include_full_task_info:
+        _log_enabled(INCLUDE_FULL_TASK_INFO, context, ift_rule)
+
+    locked_fields = _compute_locked_task_fields(task_rules, dag_rules, 
operator_rules, global_rules)
+
+    return (
+        EmissionPolicy(
+            emit=emit,
+            extract_operator_metadata=extract_operator_metadata,
+            include_source_code=include_source_code,
+            hook_lineage=hook_lineage,
+            include_full_task_info=include_full_task_info,
+        ),
+        locked_fields,
+    )
+
+
+def _resolve_conf_pure_dag(
+    rules: list[dict],
+    dag_id: str,
+) -> tuple[EmissionPolicy, frozenset[str]]:
+    """
+    Pure, deterministic conf resolution for dag-run events.
+
+    Returns ``(config, locked_fields)``. Only ``emit`` is meaningful for dag 
events;
+    all other fields in :class:`EmissionPolicy` carry their built-in defaults.
+
+    **Floor-lock semantics** (see :func:`_compute_locked_task_fields`): a field
+    is locked if *any* matching conf rule carries ``locked: true`` for that 
field.
+
+    ``emission_policy()`` is cached at the conf layer, so calling this directly
+    is cheap without any additional memoisation.
+    """
+    defaults = EmissionPolicy.defaults()
+    dag_rules, global_rules = _classify_dag_rules(rules, dag_id)
+    tiers = [dag_rules, global_rules]
+    context = f"dag event '{dag_id}'"
+
+    emit, emit_rule = _resolve_emit_with_source(tiers, "dag", defaults.emit)
+
+    if not emit:
+        _log_disabled(EMIT, context, emit_rule)
+
+    _locked: set[str] = set()
+    for rule in dag_rules + global_rules:
+        if not rule.get(RULE_LOCKED, False):
+            continue
+        controls = rule[RULE_CONTROLS]
+        if EMIT_DAG_EVENTS in controls or EMIT in controls:
+            _locked.add(EMIT)
+
+    return (
+        EmissionPolicy(
+            emit=emit,
+            extract_operator_metadata=defaults.extract_operator_metadata,
+            include_source_code=defaults.include_source_code,
+            hook_lineage=defaults.hook_lineage,
+            include_full_task_info=defaults.include_full_task_info,
+        ),
+        frozenset(_locked),
+    )
+
+
+def resolve_task_emission_policy(
+    operator: AnyOperator,
+    dag_id: str,
+    task_id: str,
+) -> EmissionPolicy:
+    """
+    Resolve the emission policy for a task-level event.
+
+    This is the **single authoritative entry point** for task event emission 
decisions.
+
+    Any active legacy options (``disabled_for_operators``, 
``disable_source_code``,
+    ``include_full_task_info``, ``selective_enable``) are *always* translated 
into equivalent
+    ``emission_policy`` rules, prepended to the user-provided rules so user 
rules win within
+    each tier (last-wins). A ``DeprecationWarning`` is issued whenever a 
legacy option
+    contributes a rule — silence it by migrating those options into 
``emission_policy``
+    exclusively.
+
+    :param operator: The Airflow operator/task object.
+    :param dag_id: The DAG ID for this task instance.
+    :param task_id: The task ID for this task instance.
+    """
+    context = f"task '{task_id}' in dag '{dag_id}'"
+    user_rules = _ol_conf.emission_policy()
+
+    legacy_rules = _synthesize_legacy_rules_task(operator, dag_id, task_id)
+    _warn_legacy_with_emission_policy(legacy_rules, scope="task")
+
+    all_rules = legacy_rules + user_rules
+    from airflow.providers.openlineage.utils.utils import 
get_fully_qualified_class_name
+
+    fqcn = get_fully_qualified_class_name(operator)
+    config, locked_fields = _resolve_conf_pure_task(all_rules, fqcn, dag_id, 
task_id)
+    return _apply_task_authoring(config, locked_fields, operator, context)
+
+
+def resolve_dag_emission_policy(dag_id: str, dag: object | None = None) -> 
EmissionPolicy:
+    """
+    Resolve the emission policy for a DAG-level event.
+
+    Only ``emit`` is meaningful for DAG events — ``extract_operator_metadata``,
+    ``include_source_code``, ``hook_lineage``, and ``include_full_task_info`` 
always
+    return their built-in defaults in the returned config (no extraction 
happens at
+    the DAG level).

Review Comment:
   resolve_dag_emission_policy returns a full 5-field EmissionPolicy where only 
emit is meaningful; the docstring has to warn readers the other four are not. 
Consider splitting into TaskEmissionPolicy / DagEmissionPolicy (or have the DAG 
resolver return just bool) — every
     DAG call site already reads only .emit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to