kacpermuda commented on code in PR #47444:
URL: https://github.com/apache/airflow/pull/47444#discussion_r1987061181


##########
providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py:
##########
@@ -58,15 +59,27 @@ def lineage_run_id(task_instance: TaskInstance):
         For more information take a look at the guide:
         :ref:`howto/macros:openlineage`
     """
-    if hasattr(task_instance, "logical_date"):
-        logical_date = task_instance.logical_date
+    if AIRFLOW_V_3_0_PLUS:
+        context = task_instance.get_template_context()
+        if hasattr(task_instance, "dag_run"):
+            dag_run = task_instance.dag_run
+        elif hasattr(context, "dag_run"):
+            dag_run = context["dag_run"]
+        else:
+            return ""

Review Comment:
   Should we silently return an empty string here? What is the scenario here 
where there is no dag_run? Can this happen?



##########
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py:
##########
@@ -53,35 +53,66 @@ def 
_get_parent_job_information_as_spark_properties(context: Context) -> dict:
 
 def _get_transport_information_as_spark_properties() -> dict:
     """Retrieve transport information as Spark properties."""
-    transport = 
get_openlineage_listener().adapter.get_or_create_openlineage_client().transport
-    if transport.kind != "http":
-        log.info(
-            "OpenLineage transport type `%s` does not support automatic "
-            "injection of OpenLineage transport information into Spark 
properties.",
-            transport.kind,
-        )
-        return {}
-
-    properties = {
-        "spark.openlineage.transport.type": transport.kind,
-        "spark.openlineage.transport.url": transport.url,
-        "spark.openlineage.transport.endpoint": transport.endpoint,
-        "spark.openlineage.transport.timeoutInMillis": str(
-            int(transport.timeout * 1000)  # convert to milliseconds, as 
required by Spark integration
-        ),
-    }
-    if transport.compression:
-        properties["spark.openlineage.transport.compression"] = 
str(transport.compression)
 
-    if hasattr(transport.config.auth, "api_key") and 
transport.config.auth.get_bearer():
-        properties["spark.openlineage.transport.auth.type"] = "api_key"
-        properties["spark.openlineage.transport.auth.apiKey"] = 
transport.config.auth.get_bearer()
+    def _get_transport_information(tp) -> dict:
+        properties = {
+            "type": tp.kind,
+            "url": tp.url,
+            "endpoint": tp.endpoint,
+            "timeoutInMillis": str(
+                int(tp.timeout * 1000)  # convert to milliseconds, as required 
by Spark integration
+            ),
+        }
+        if hasattr(tp, "compression") and tp.compression:
+            properties["compression"] = str(tp.compression)
+
+        if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
+            properties["auth.type"] = "api_key"
+            properties["auth.apiKey"] = tp.config.auth.get_bearer()
+
+        if hasattr(tp.config, "custom_headers") and tp.config.custom_headers:
+            for key, value in tp.config.custom_headers.items():
+                properties[f"headers.{key}"] = value
+        return properties
+
+    def _format_transport(props: dict, transport: dict, name: str | None):
+        for key, value in transport.items():
+            if name:
+                props[f"spark.openlineage.transport.transports.{name}.{key}"] 
= value
+            else:
+                props[f"spark.openlineage.transport.{key}"] = value
+        return props
 
-    if hasattr(transport.config, "custom_headers") and 
transport.config.custom_headers:
-        for key, value in transport.config.custom_headers.items():
-            properties[f"spark.openlineage.transport.headers.{key}"] = value
+    transport = 
get_openlineage_listener().adapter.get_or_create_openlineage_client().transport
 
-    return properties
+    if transport.kind == "composite":
+        http_transports = {}
+        for nested_transport in transport.transports:
+            if nested_transport.kind == "http":
+                http_transports[nested_transport.name] = 
_get_transport_information(nested_transport)
+        if len(http_transports) == 0:
+            log.info(
+                "OpenLineage transport type `composite` does not contain http 
transport. Skipping "
+                "injection of OpenLineage transport information into Spark 
properties.",
+            )
+            return {}

Review Comment:
   I think we should log a warning here if we encounter transports other than 
HTTP, instead of silently skipping them. We are in the place where only users 
that opted-in will be so we can show some extra logs. 
   
   Why warning and not info like below? If we have a simple transport and it's 
not http, the events will not be delivered - it's easy to spot. With composite 
transport, users may expect that full translation to spark has happened, and 
they would receive events for their http endpoints, so it may be a bit 
misleading/not clear for them on why not all backends have received events.



##########
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py:
##########
@@ -90,38 +90,77 @@ def 
inject_transport_information_into_spark_properties(properties: dict, context
                     )
                     return properties
 
-                transport = 
get_openlineage_listener().adapter.get_or_create_openlineage_client().transport
-                if transport.kind != "http":
+                def _get_transport_information_as_spark_properties() -> dict:
+                    """Retrieve transport information as Spark properties."""
+
+                    def _get_transport_information(tp) -> dict:
+                        props = {
+                            "type": tp.kind,
+                            "url": tp.url,
+                            "endpoint": tp.endpoint,
+                            "timeoutInMillis": str(
+                                int(tp.timeout * 1000)
+                                # convert to milliseconds, as required by 
Spark integration
+                            ),
+                        }
+                        if hasattr(tp, "compression") and tp.compression:
+                            props["compression"] = str(tp.compression)
+
+                        if hasattr(tp.config.auth, "api_key") and 
tp.config.auth.get_bearer():
+                            props["auth.type"] = "api_key"
+                            props["auth.apiKey"] = tp.config.auth.get_bearer()
+
+                        if hasattr(tp.config, "custom_headers") and 
tp.config.custom_headers:
+                            for key, value in tp.config.custom_headers.items():
+                                props[f"headers.{key}"] = value
+                        return props
+
+                    def _format_transport(props: dict, transport: dict, name: 
str | None):
+                        for key, value in transport.items():
+                            if name:
+                                
props[f"spark.openlineage.transport.transports.{name}.{key}"] = value
+                            else:
+                                props[f"spark.openlineage.transport.{key}"] = 
value
+                        return props
+
+                    transport = (
+                        
get_openlineage_listener().adapter.get_or_create_openlineage_client().transport
+                    )
+
+                    if transport.kind == "composite":
+                        http_transports = {}
+                        for nested_transport in transport.transports:
+                            if nested_transport.kind == "http":
+                                http_transports[nested_transport.name] = 
_get_transport_information(
+                                    nested_transport
+                                )
+                        if len(http_transports) == 0:
+                            log.info(
+                                "OpenLineage transport type `composite` does 
not contain http transport. Skipping "
+                                "injection of OpenLineage transport 
information into Spark properties.",
+                            )
+                            return {}

Review Comment:
   Same as in the other comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to