mobuchowski commented on code in PR #47444:
URL: https://github.com/apache/airflow/pull/47444#discussion_r1987275216
##########
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py:
##########
@@ -90,38 +90,77 @@ def
inject_transport_information_into_spark_properties(properties: dict, context
)
return properties
- transport =
get_openlineage_listener().adapter.get_or_create_openlineage_client().transport
- if transport.kind != "http":
+ def _get_transport_information_as_spark_properties() -> dict:
+ """Retrieve transport information as Spark properties."""
+
+ def _get_transport_information(tp) -> dict:
+ props = {
+ "type": tp.kind,
+ "url": tp.url,
+ "endpoint": tp.endpoint,
+ "timeoutInMillis": str(
+ int(tp.timeout * 1000)
+ # convert to milliseconds, as required by
Spark integration
+ ),
+ }
+ if hasattr(tp, "compression") and tp.compression:
+ props["compression"] = str(tp.compression)
+
+ if hasattr(tp.config.auth, "api_key") and
tp.config.auth.get_bearer():
+ props["auth.type"] = "api_key"
+ props["auth.apiKey"] = tp.config.auth.get_bearer()
+
+ if hasattr(tp.config, "custom_headers") and
tp.config.custom_headers:
+ for key, value in tp.config.custom_headers.items():
+ props[f"headers.{key}"] = value
+ return props
+
+ def _format_transport(props: dict, transport: dict, name:
str | None):
+ for key, value in transport.items():
+ if name:
+
props[f"spark.openlineage.transport.transports.{name}.{key}"] = value
+ else:
+ props[f"spark.openlineage.transport.{key}"] =
value
+ return props
+
+ transport = (
+
get_openlineage_listener().adapter.get_or_create_openlineage_client().transport
+ )
+
+ if transport.kind == "composite":
+ http_transports = {}
+ for nested_transport in transport.transports:
+ if nested_transport.kind == "http":
+ http_transports[nested_transport.name] =
_get_transport_information(
+ nested_transport
+ )
+ if len(http_transports) == 0:
+ log.info(
+ "OpenLineage transport type `composite` does
not contain http transport. Skipping "
+ "injection of OpenLineage transport
information into Spark properties.",
+ )
+ return {}
Review Comment:
Changed log level here too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]