kacpermuda commented on code in PR #47444:
URL: https://github.com/apache/airflow/pull/47444#discussion_r1987061181
##########
providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py:
##########
@@ -58,15 +59,27 @@ def lineage_run_id(task_instance: TaskInstance):
For more information take a look at the guide:
:ref:`howto/macros:openlineage`
"""
- if hasattr(task_instance, "logical_date"):
- logical_date = task_instance.logical_date
+ if AIRFLOW_V_3_0_PLUS:
+ context = task_instance.get_template_context()
+ if hasattr(task_instance, "dag_run"):
+ dag_run = task_instance.dag_run
+ elif hasattr(context, "dag_run"):
+ dag_run = context["dag_run"]
+ else:
+ return ""
Review Comment:
Should we silently return an empty string here? What is the scenario here
where there is no dag_run? Can this happen?
##########
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py:
##########
@@ -53,35 +53,66 @@ def
_get_parent_job_information_as_spark_properties(context: Context) -> dict:
def _get_transport_information_as_spark_properties() -> dict:
"""Retrieve transport information as Spark properties."""
- transport =
get_openlineage_listener().adapter.get_or_create_openlineage_client().transport
- if transport.kind != "http":
- log.info(
- "OpenLineage transport type `%s` does not support automatic "
- "injection of OpenLineage transport information into Spark
properties.",
- transport.kind,
- )
- return {}
-
- properties = {
- "spark.openlineage.transport.type": transport.kind,
- "spark.openlineage.transport.url": transport.url,
- "spark.openlineage.transport.endpoint": transport.endpoint,
- "spark.openlineage.transport.timeoutInMillis": str(
- int(transport.timeout * 1000) # convert to milliseconds, as
required by Spark integration
- ),
- }
- if transport.compression:
- properties["spark.openlineage.transport.compression"] =
str(transport.compression)
- if hasattr(transport.config.auth, "api_key") and
transport.config.auth.get_bearer():
- properties["spark.openlineage.transport.auth.type"] = "api_key"
- properties["spark.openlineage.transport.auth.apiKey"] =
transport.config.auth.get_bearer()
+ def _get_transport_information(tp) -> dict:
+ properties = {
+ "type": tp.kind,
+ "url": tp.url,
+ "endpoint": tp.endpoint,
+ "timeoutInMillis": str(
+ int(tp.timeout * 1000) # convert to milliseconds, as required
by Spark integration
+ ),
+ }
+ if hasattr(tp, "compression") and tp.compression:
+ properties["compression"] = str(tp.compression)
+
+ if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
+ properties["auth.type"] = "api_key"
+ properties["auth.apiKey"] = tp.config.auth.get_bearer()
+
+ if hasattr(tp.config, "custom_headers") and tp.config.custom_headers:
+ for key, value in tp.config.custom_headers.items():
+ properties[f"headers.{key}"] = value
+ return properties
+
+ def _format_transport(props: dict, transport: dict, name: str | None):
+ for key, value in transport.items():
+ if name:
+ props[f"spark.openlineage.transport.transports.{name}.{key}"]
= value
+ else:
+ props[f"spark.openlineage.transport.{key}"] = value
+ return props
- if hasattr(transport.config, "custom_headers") and
transport.config.custom_headers:
- for key, value in transport.config.custom_headers.items():
- properties[f"spark.openlineage.transport.headers.{key}"] = value
+ transport =
get_openlineage_listener().adapter.get_or_create_openlineage_client().transport
- return properties
+ if transport.kind == "composite":
+ http_transports = {}
+ for nested_transport in transport.transports:
+ if nested_transport.kind == "http":
+ http_transports[nested_transport.name] =
_get_transport_information(nested_transport)
+ if len(http_transports) == 0:
+ log.info(
+ "OpenLineage transport type `composite` does not contain http
transport. Skipping "
+ "injection of OpenLineage transport information into Spark
properties.",
+ )
+ return {}
Review Comment:
I think we should log a warning here if we encounter transports other than
HTTP, instead of silently skipping them. We are in the place where only users
that opted-in will be so we can show some extra logs.
Why warning and not info like below? If we have a simple transport and it's
not http, the events will not be delivered - it's easy to spot. With composite
transport, users may expect that full translation to spark has happened, and
they would receive events for their http endpoints, so it may be a bit
misleading/not clear for them on why not all backends have received events.
##########
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py:
##########
@@ -90,38 +90,77 @@ def
inject_transport_information_into_spark_properties(properties: dict, context
)
return properties
- transport =
get_openlineage_listener().adapter.get_or_create_openlineage_client().transport
- if transport.kind != "http":
+ def _get_transport_information_as_spark_properties() -> dict:
+ """Retrieve transport information as Spark properties."""
+
+ def _get_transport_information(tp) -> dict:
+ props = {
+ "type": tp.kind,
+ "url": tp.url,
+ "endpoint": tp.endpoint,
+ "timeoutInMillis": str(
+ int(tp.timeout * 1000)
+ # convert to milliseconds, as required by
Spark integration
+ ),
+ }
+ if hasattr(tp, "compression") and tp.compression:
+ props["compression"] = str(tp.compression)
+
+ if hasattr(tp.config.auth, "api_key") and
tp.config.auth.get_bearer():
+ props["auth.type"] = "api_key"
+ props["auth.apiKey"] = tp.config.auth.get_bearer()
+
+ if hasattr(tp.config, "custom_headers") and
tp.config.custom_headers:
+ for key, value in tp.config.custom_headers.items():
+ props[f"headers.{key}"] = value
+ return props
+
+ def _format_transport(props: dict, transport: dict, name:
str | None):
+ for key, value in transport.items():
+ if name:
+
props[f"spark.openlineage.transport.transports.{name}.{key}"] = value
+ else:
+ props[f"spark.openlineage.transport.{key}"] =
value
+ return props
+
+ transport = (
+
get_openlineage_listener().adapter.get_or_create_openlineage_client().transport
+ )
+
+ if transport.kind == "composite":
+ http_transports = {}
+ for nested_transport in transport.transports:
+ if nested_transport.kind == "http":
+ http_transports[nested_transport.name] =
_get_transport_information(
+ nested_transport
+ )
+ if len(http_transports) == 0:
+ log.info(
+ "OpenLineage transport type `composite` does
not contain http transport. Skipping "
+ "injection of OpenLineage transport
information into Spark properties.",
+ )
+ return {}
Review Comment:
Same as in the other comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]