kacpermuda commented on code in PR #59094:
URL: https://github.com/apache/airflow/pull/59094#discussion_r2592555297


##########
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py:
##########
@@ -35,6 +35,43 @@
 log = logging.getLogger(__name__)
 
 
+class ParentJobInformation(NamedTuple):
+    """Container for OpenLineage parent job information."""
+
+    parent_job_namespace: str
+    parent_job_name: str
+    parent_run_id: str
+    root_parent_job_namespace: str
+    root_parent_job_name: str
+    root_parent_run_id: str
+
+
+def get_parent_job_information(context: Context) -> ParentJobInformation | 
None:
+    """
+    Retrieve parent job information from the Airflow context.
+
+    This function extracts OpenLineage parent job details from the task 
instance,
+    which can be used by various integrations (Spark, Glue, etc.) to propagate
+    lineage information to child jobs.
+
+    Args:
+        context: The Airflow context containing task instance information.
+
+    Returns:
+        ParentJobInformation containing namespace, job name, and run IDs
+        for both parent and root parent.
+    """
+    ti = context["ti"]
+    return ParentJobInformation(
+        parent_job_namespace=lineage_job_namespace(),
+        parent_job_name=lineage_job_name(ti),  # type: ignore[arg-type]
+        parent_run_id=lineage_run_id(ti),  # type: ignore[arg-type]
+        root_parent_job_namespace=lineage_job_namespace(),

Review Comment:
   We should use `lineage_root_job_namespace` here. Must have missed that when 
adding that macro.



##########
providers/amazon/src/airflow/providers/amazon/aws/utils/openlineage.py:
##########
@@ -136,3 +141,148 @@ def get_identity_column_lineage_facet(
         }
     )
     return column_lineage_facet
+
+
+def _parse_glue_customer_env_vars(env_vars_string: str | None) -> dict[str, 
str]:
+    """
+    Parse the --customer-driver-env-vars format into a dict.
+
+    Format: "KEY1=VAL1,KEY2=\"val2,val2 val2\""
+    - Simple values: KEY=VALUE
+    - Values with commas/spaces: KEY="value with, spaces"
+
+    Args:
+        env_vars_string: The environment variables string from Glue script 
args.
+
+    Returns:
+        Dict of key-value pairs.
+    """
+    if not env_vars_string:
+        return {}
+
+    result: dict[str, str] = {}
+    current = ""
+    in_quotes = False
+
+    for char in env_vars_string:
+        if char == '"' and (not current or current[-1] != "\\"):
+            in_quotes = not in_quotes
+            current += char
+        elif char == "," and not in_quotes:
+            if "=" in current:
+                key, value = current.split("=", 1)
+                # Strip surrounding quotes if present
+                value = value.strip()
+                if value.startswith('"') and value.endswith('"'):
+                    value = value[1:-1]
+                result[key.strip()] = value
+            current = ""
+        else:
+            current += char
+
+    # Handle last element
+    if current and "=" in current:
+        key, value = current.split("=", 1)
+        value = value.strip()
+        if value.startswith('"') and value.endswith('"'):
+            value = value[1:-1]
+        result[key.strip()] = value
+
+    return result
+
+
+def _format_glue_customer_env_vars(env_vars: dict[str, str]) -> str:
+    """
+    Format a dict back into the --customer-driver-env-vars string format.
+
+    - Values containing commas, spaces, or quotes need quoting
+    - Quotes within values need escaping
+
+    Args:
+        env_vars: Dict of environment variables.
+
+    Returns:
+        String in format "KEY1=VAL1,KEY2=\"val2\""
+    """
+    parts = []
+    for key, value in env_vars.items():
+        # Quote if contains special chars
+        if "," in value or " " in value or '"' in value:
+            escaped_value = value.replace('"', '\\"')
+            parts.append(f'{key}="{escaped_value}"')
+        else:
+            parts.append(f"{key}={value}")
+    return ",".join(parts)
+
+
+def _is_parent_job_info_present_in_glue_env_vars(script_args: dict[str, Any]) 
-> bool:
+    """
+    Check if any OpenLineage parent job env vars are already set.
+
+    Args:
+        script_args: The Glue job's script_args dict.
+
+    Returns:
+        True if any OL parent job env vars are present.
+    """
+    # Check --customer-driver-env-vars
+    driver_env_vars_str = script_args.get("--customer-driver-env-vars", "")
+    driver_env_vars = _parse_glue_customer_env_vars(driver_env_vars_str)
+
+    # Also check --customer-executor-env-vars
+    executor_env_vars_str = script_args.get("--customer-executor-env-vars", "")
+    executor_env_vars = _parse_glue_customer_env_vars(executor_env_vars_str)
+
+    all_env_vars = {**driver_env_vars, **executor_env_vars}
+
+    # Check if ANY OpenLineage parent env var is present
+    return any(
+        key.startswith("OPENLINEAGE_PARENT") or 
key.startswith("OPENLINEAGE_ROOT_PARENT")
+        for key in all_env_vars
+    )
+
+
+def inject_parent_job_information_into_glue_script_args(
+    script_args: dict[str, Any], context: Context
+) -> dict[str, Any]:
+    """
+    Inject OpenLineage parent job info into Glue script_args.
+
+    The parent job information is injected via the --customer-driver-env-vars 
argument,
+    which sets environment variables in the Spark driver process.
+
+    - If OpenLineage provider is not available, skip injection

Review Comment:
   The OL provider can be disabled and this will still inject I think. The 
compat can return valid info in that case. Maybe some additional check for that 
like I did 
[here](https://github.com/apache/airflow/pull/58672/files#diff-57bc5226c951fac469beeef4ea2bbcd44e58037a8a501d6226416b6dab030a45R34)
 ?



##########
providers/amazon/src/airflow/providers/amazon/aws/utils/openlineage.py:
##########
@@ -136,3 +141,148 @@ def get_identity_column_lineage_facet(
         }
     )
     return column_lineage_facet
+
+
+def _parse_glue_customer_env_vars(env_vars_string: str | None) -> dict[str, 
str]:
+    """
+    Parse the --customer-driver-env-vars format into a dict.
+
+    Format: "KEY1=VAL1,KEY2=\"val2,val2 val2\""
+    - Simple values: KEY=VALUE
+    - Values with commas/spaces: KEY="value with, spaces"
+
+    Args:
+        env_vars_string: The environment variables string from Glue script 
args.
+
+    Returns:
+        Dict of key-value pairs.
+    """
+    if not env_vars_string:
+        return {}
+
+    result: dict[str, str] = {}
+    current = ""
+    in_quotes = False
+
+    for char in env_vars_string:
+        if char == '"' and (not current or current[-1] != "\\"):
+            in_quotes = not in_quotes
+            current += char
+        elif char == "," and not in_quotes:
+            if "=" in current:
+                key, value = current.split("=", 1)
+                # Strip surrounding quotes if present
+                value = value.strip()
+                if value.startswith('"') and value.endswith('"'):
+                    value = value[1:-1]
+                result[key.strip()] = value
+            current = ""
+        else:
+            current += char
+
+    # Handle last element
+    if current and "=" in current:
+        key, value = current.split("=", 1)
+        value = value.strip()
+        if value.startswith('"') and value.endswith('"'):
+            value = value[1:-1]
+        result[key.strip()] = value
+
+    return result
+
+
+def _format_glue_customer_env_vars(env_vars: dict[str, str]) -> str:
+    """
+    Format a dict back into the --customer-driver-env-vars string format.
+
+    - Values containing commas, spaces, or quotes need quoting
+    - Quotes within values need escaping
+
+    Args:
+        env_vars: Dict of environment variables.
+
+    Returns:
+        String in format "KEY1=VAL1,KEY2=\"val2\""
+    """
+    parts = []
+    for key, value in env_vars.items():
+        # Quote if contains special chars
+        if "," in value or " " in value or '"' in value:
+            escaped_value = value.replace('"', '\\"')
+            parts.append(f'{key}="{escaped_value}"')
+        else:
+            parts.append(f"{key}={value}")
+    return ",".join(parts)
+
+
+def _is_parent_job_info_present_in_glue_env_vars(script_args: dict[str, Any]) 
-> bool:
+    """
+    Check if any OpenLineage parent job env vars are already set.
+
+    Args:
+        script_args: The Glue job's script_args dict.
+
+    Returns:
+        True if any OL parent job env vars are present.
+    """
+    # Check --customer-driver-env-vars
+    driver_env_vars_str = script_args.get("--customer-driver-env-vars", "")
+    driver_env_vars = _parse_glue_customer_env_vars(driver_env_vars_str)
+
+    # Also check --customer-executor-env-vars
+    executor_env_vars_str = script_args.get("--customer-executor-env-vars", "")
+    executor_env_vars = _parse_glue_customer_env_vars(executor_env_vars_str)
+
+    all_env_vars = {**driver_env_vars, **executor_env_vars}
+
+    # Check if ANY OpenLineage parent env var is present
+    return any(
+        key.startswith("OPENLINEAGE_PARENT") or 
key.startswith("OPENLINEAGE_ROOT_PARENT")
+        for key in all_env_vars
+    )
+
+
+def inject_parent_job_information_into_glue_script_args(

Review Comment:
   I think we should wrap it all in try/except (e.g., how I did 
[here](https://github.com/apache/airflow/pull/58672/files#diff-57bc5226c951fac469beeef4ea2bbcd44e58037a8a501d6226416b6dab030a45R153))
 just to make sure if any unexpected failure happens we handle it gracefully, 
return unmodified params and log a warning for the user.



##########
providers/amazon/src/airflow/providers/amazon/aws/utils/openlineage.py:
##########
@@ -136,3 +141,148 @@ def get_identity_column_lineage_facet(
         }
     )
     return column_lineage_facet
+
+
+def _parse_glue_customer_env_vars(env_vars_string: str | None) -> dict[str, 
str]:
+    """
+    Parse the --customer-driver-env-vars format into a dict.
+
+    Format: "KEY1=VAL1,KEY2=\"val2,val2 val2\""
+    - Simple values: KEY=VALUE
+    - Values with commas/spaces: KEY="value with, spaces"
+
+    Args:
+        env_vars_string: The environment variables string from Glue script 
args.
+
+    Returns:
+        Dict of key-value pairs.
+    """
+    if not env_vars_string:
+        return {}
+
+    result: dict[str, str] = {}
+    current = ""
+    in_quotes = False
+
+    for char in env_vars_string:
+        if char == '"' and (not current or current[-1] != "\\"):
+            in_quotes = not in_quotes
+            current += char
+        elif char == "," and not in_quotes:
+            if "=" in current:
+                key, value = current.split("=", 1)
+                # Strip surrounding quotes if present
+                value = value.strip()
+                if value.startswith('"') and value.endswith('"'):
+                    value = value[1:-1]
+                result[key.strip()] = value
+            current = ""
+        else:
+            current += char
+
+    # Handle last element
+    if current and "=" in current:
+        key, value = current.split("=", 1)
+        value = value.strip()
+        if value.startswith('"') and value.endswith('"'):
+            value = value[1:-1]
+        result[key.strip()] = value

Review Comment:
   I see a lot of quoting and other edge cases here, but tests are not really 
clear on how they are handled (or maybe it's not clear to me). Looking at this 
I'm not sure that we handle them all correctly/ Can you add more description to 
the docstring here and/or tests?



##########
providers/amazon/src/airflow/providers/amazon/aws/utils/openlineage.py:
##########
@@ -136,3 +141,148 @@ def get_identity_column_lineage_facet(
         }
     )
     return column_lineage_facet
+
+
+def _parse_glue_customer_env_vars(env_vars_string: str | None) -> dict[str, 
str]:
+    """
+    Parse the --customer-driver-env-vars format into a dict.
+
+    Format: "KEY1=VAL1,KEY2=\"val2,val2 val2\""
+    - Simple values: KEY=VALUE
+    - Values with commas/spaces: KEY="value with, spaces"
+
+    Args:
+        env_vars_string: The environment variables string from Glue script 
args.
+
+    Returns:
+        Dict of key-value pairs.
+    """
+    if not env_vars_string:
+        return {}
+
+    result: dict[str, str] = {}
+    current = ""
+    in_quotes = False
+
+    for char in env_vars_string:
+        if char == '"' and (not current or current[-1] != "\\"):
+            in_quotes = not in_quotes
+            current += char
+        elif char == "," and not in_quotes:
+            if "=" in current:
+                key, value = current.split("=", 1)
+                # Strip surrounding quotes if present
+                value = value.strip()
+                if value.startswith('"') and value.endswith('"'):
+                    value = value[1:-1]
+                result[key.strip()] = value
+            current = ""
+        else:
+            current += char
+
+    # Handle last element
+    if current and "=" in current:
+        key, value = current.split("=", 1)
+        value = value.strip()
+        if value.startswith('"') and value.endswith('"'):
+            value = value[1:-1]
+        result[key.strip()] = value
+
+    return result
+
+
+def _format_glue_customer_env_vars(env_vars: dict[str, str]) -> str:
+    """
+    Format a dict back into the --customer-driver-env-vars string format.
+
+    - Values containing commas, spaces, or quotes need quoting
+    - Quotes within values need escaping
+
+    Args:
+        env_vars: Dict of environment variables.
+
+    Returns:
+        String in format "KEY1=VAL1,KEY2=\"val2\""
+    """
+    parts = []
+    for key, value in env_vars.items():
+        # Quote if contains special chars
+        if "," in value or " " in value or '"' in value:
+            escaped_value = value.replace('"', '\\"')
+            parts.append(f'{key}="{escaped_value}"')
+        else:
+            parts.append(f"{key}={value}")
+    return ",".join(parts)
+
+
+def _is_parent_job_info_present_in_glue_env_vars(script_args: dict[str, Any]) 
-> bool:
+    """
+    Check if any OpenLineage parent job env vars are already set.
+
+    Args:
+        script_args: The Glue job's script_args dict.
+
+    Returns:
+        True if any OL parent job env vars are present.
+    """
+    # Check --customer-driver-env-vars
+    driver_env_vars_str = script_args.get("--customer-driver-env-vars", "")
+    driver_env_vars = _parse_glue_customer_env_vars(driver_env_vars_str)
+
+    # Also check --customer-executor-env-vars
+    executor_env_vars_str = script_args.get("--customer-executor-env-vars", "")
+    executor_env_vars = _parse_glue_customer_env_vars(executor_env_vars_str)
+
+    all_env_vars = {**driver_env_vars, **executor_env_vars}
+
+    # Check if ANY OpenLineage parent env var is present
+    return any(
+        key.startswith("OPENLINEAGE_PARENT") or 
key.startswith("OPENLINEAGE_ROOT_PARENT")
+        for key in all_env_vars
+    )
+
+
+def inject_parent_job_information_into_glue_script_args(
+    script_args: dict[str, Any], context: Context
+) -> dict[str, Any]:
+    """
+    Inject OpenLineage parent job info into Glue script_args.
+
+    The parent job information is injected via the --customer-driver-env-vars 
argument,
+    which sets environment variables in the Spark driver process.
+
+    - If OpenLineage provider is not available, skip injection
+    - If user already set any OPENLINEAGE_PARENT_* or 
OPENLINEAGE_ROOT_PARENT_* env vars,
+      skip injection to preserve user-provided values
+    - Merge with existing --customer-driver-env-vars if present
+    - Return new dict (don't mutate original)
+
+    Args:
+        script_args: The Glue job's script_args dict.
+        context: Airflow task context.
+
+    Returns:
+        Modified script_args with OpenLineage env vars injected.
+    """
+    info = get_parent_job_information(context)
+    if info is None:
+        return script_args
+
+    existing_env_vars_str = script_args.get("--customer-driver-env-vars", "")
+    existing_env_vars = _parse_glue_customer_env_vars(existing_env_vars_str)

Review Comment:
   Why do we need such complex parsing and then formatting? Can't we just check 
if there are any "OPENLINEAGE_PARENT/ROOT" substring and if not simply append 
to the end of the env vars string? I feel like we risk a lot less than when 
we're parsing and then formatting customer vars, and the result is the same.



##########
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py:
##########
@@ -35,6 +35,43 @@
 log = logging.getLogger(__name__)
 
 
+class ParentJobInformation(NamedTuple):
+    """Container for OpenLineage parent job information."""
+
+    parent_job_namespace: str
+    parent_job_name: str
+    parent_run_id: str
+    root_parent_job_namespace: str
+    root_parent_job_name: str
+    root_parent_run_id: str
+
+
+def get_parent_job_information(context: Context) -> ParentJobInformation | 
None:
+    """
+    Retrieve parent job information from the Airflow context.

Review Comment:
   This should probably not live in spark utils, as it's unrelated to spark. 
But since we do not have the "public api" on the provider yet, I think it's as 
good as any other place.



##########
providers/amazon/src/airflow/providers/amazon/aws/utils/openlineage.py:
##########
@@ -136,3 +141,148 @@ def get_identity_column_lineage_facet(
         }
     )
     return column_lineage_facet
+
+
+def _parse_glue_customer_env_vars(env_vars_string: str | None) -> dict[str, 
str]:
+    """
+    Parse the --customer-driver-env-vars format into a dict.
+
+    Format: "KEY1=VAL1,KEY2=\"val2,val2 val2\""
+    - Simple values: KEY=VALUE
+    - Values with commas/spaces: KEY="value with, spaces"
+
+    Args:
+        env_vars_string: The environment variables string from Glue script 
args.
+
+    Returns:
+        Dict of key-value pairs.
+    """
+    if not env_vars_string:
+        return {}
+
+    result: dict[str, str] = {}
+    current = ""
+    in_quotes = False
+
+    for char in env_vars_string:
+        if char == '"' and (not current or current[-1] != "\\"):
+            in_quotes = not in_quotes
+            current += char
+        elif char == "," and not in_quotes:
+            if "=" in current:
+                key, value = current.split("=", 1)
+                # Strip surrounding quotes if present
+                value = value.strip()
+                if value.startswith('"') and value.endswith('"'):
+                    value = value[1:-1]
+                result[key.strip()] = value
+            current = ""
+        else:
+            current += char
+
+    # Handle last element
+    if current and "=" in current:
+        key, value = current.split("=", 1)
+        value = value.strip()
+        if value.startswith('"') and value.endswith('"'):
+            value = value[1:-1]
+        result[key.strip()] = value
+
+    return result
+
+
+def _format_glue_customer_env_vars(env_vars: dict[str, str]) -> str:
+    """
+    Format a dict back into the --customer-driver-env-vars string format.
+
+    - Values containing commas, spaces, or quotes need quoting
+    - Quotes within values need escaping
+
+    Args:
+        env_vars: Dict of environment variables.
+
+    Returns:
+        String in format "KEY1=VAL1,KEY2=\"val2\""
+    """
+    parts = []
+    for key, value in env_vars.items():
+        # Quote if contains special chars
+        if "," in value or " " in value or '"' in value:
+            escaped_value = value.replace('"', '\\"')
+            parts.append(f'{key}="{escaped_value}"')
+        else:
+            parts.append(f"{key}={value}")
+    return ",".join(parts)
+
+
+def _is_parent_job_info_present_in_glue_env_vars(script_args: dict[str, Any]) 
-> bool:

Review Comment:
   I don't see this actually being used anywhere? There is no test that 
actually checks if the OL variables are not overwritten.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to